Nothing
## ----include = FALSE----------------------------------------------------------
knitr::opts_chunk$set(
collapse = TRUE,
comment = "#>"
)
library(DBmaps)
library(data.table)
## ----load_data, message=FALSE-------------------------------------------------
transactions[, time := as.POSIXct(time, format = "%Y-%m-%dT%H:%M:%OSZ", tz = "UTC")]
views[, time := as.POSIXct(time, format = "%Y-%m-%dT%H:%M:%OSZ", tz = "UTC")]
cat("--- All 4 Raw Data Tables Loaded Successfully ---\n")
cat("---Customers Data---\n")
print(head(customers, 5))
cat("---products Data---\n")
print(head(products, 5))
cat("---Transactions Data---\n")
print(head(transactions, 5))
cat("---Views Data---\n")
print(head(views, 5))
## ----define_metadata----------------------------------------------------------
customers_info <- table_info(
table_name = "customers", source_identifier = "customers.csv", identifier_columns = "customer_id",
key_outcome_specs = list(list(OutcomeName = "CustomerCount", ValueExpression = 1, AggregationMethods = list(
list(AggregatedName = "CountByRegion", AggregationFunction = "sum", GroupingVariables = "region")
)))
)
products_info <- table_info(
table_name = "products", source_identifier = "products.csv", identifier_columns = "product_id",
key_outcome_specs = list(list(OutcomeName = "ProductCount", ValueExpression = 1, AggregationMethods = list(
list(AggregatedName = "ProductsPerCategory", AggregationFunction = "sum", GroupingVariables = "category")
)))
)
transactions_info <- table_info(
table_name = "transactions", source_identifier = "transactions.csv", identifier_columns = c("customer_id", "product_id", "time"),
key_outcome_specs = list(
list(OutcomeName = "Revenue", ValueExpression = quote(price * quantity), AggregationMethods = list(
list(AggregatedName = "RevenueByCustomer", AggregationFunction = "sum", GroupingVariables = "customer_id"),
list(AggregatedName = "RevenueByProduct", AggregationFunction = "sum", GroupingVariables = "product_id"),
list(AggregatedName = "DailyRevenueByCustomerProduct", AggregationFunction = "sum", GroupingVariables = c("customer_id", "product_id", "time"))
)),
list(OutcomeName = "UnitsSold", ValueExpression = quote(quantity), AggregationMethods = list(
list(AggregatedName = "TotalUnitsByCustomer", AggregationFunction = "sum", GroupingVariables = "customer_id")
))
)
)
views_info <- table_info(
table_name = "views", source_identifier = "views.csv", identifier_columns = c("customer_id", "product_id", "time"),
key_outcome_specs = list(list(OutcomeName = "ViewCount", ValueExpression = 1, AggregationMethods = list(
list(AggregatedName = "ViewsByProduct", AggregationFunction = "count", GroupingVariables = "product_id"),
list(AggregatedName = "ViewsByCustomer", AggregationFunction = "count", GroupingVariables = "customer_id")
)))
)
cat("---Metadata for transactions---\n")
print(transactions_info)
## ----creating-registry, echo = TRUE-------------------------------------------
meta <- create_metadata_registry()
meta <- add_table(meta, customers_info)
meta <- add_table(meta, products_info)
meta <- add_table(meta, views_info)
meta <- add_table(meta, transactions_info)
print(meta)
## ----map_paths, message=TRUE--------------------------------------------------
# Create a named list of the actual data tables for the functions to use
all_tables <- list(
customers = customers,
products = products,
transactions = transactions,
views = views
)
# Generate the join map
paths <- map_join_paths(meta, all_tables)
print(paths)
## ----create_plan--------------------------------------------------------------
# Define our desired output
selections <- list(
products = c("product_id", "category"), # Base columns from the products table
transactions = "RevenueByProduct", # The aggregated revenue by product
views = "ViewsByProduct" # The aggregated view count by product
)
# Generate the plan
plan <- create_join_plan(
base_table = "products",
selections = selections,
metadata_dt = meta,
join_map = paths
)
print(plan)
## ----setup_invalid------------------------------------------------------------
# Add product metadata for this example
products_meta <- table_info("products", "p.csv", "product_id", list(list(OutcomeName="x",ValueExpression=1,AggregationMethods=list(list(AggregatedName="y",AggregationFunction="z",GroupingVariables="category")))))
transactions_meta_v2 <- table_info("transactions", "t.csv", "trans_id", list(
list(OutcomeName="Revenue", ValueExpression=quote(price*qty), AggregationMethods=list(
# This aggregation is by product_id, not customer_id
list(AggregatedName="RevenueByProduct", AggregationFunction="sum", GroupingVariables="product_id")
))
))
invalid_metadata <- create_metadata_registry()
invalid_metadata <- add_table(invalid_metadata, products_meta)
invalid_metadata <- add_table(invalid_metadata, transactions_meta_v2)
# The invalid request
invalid_selections <- list(
customers = "customer_id",
transactions = "RevenueByProduct"
)
## ----run_invalid, error=TRUE--------------------------------------------------
try({
create_join_plan(
base_table = "customers",
selections = invalid_selections,
metadata_dt = invalid_metadata
)
})
## ----plot_plan, fig.align='center', results='asis', fig.width=10--------------
# This requires the DiagrammeR package
if (requireNamespace("DiagrammeR", quietly = TRUE)) {
# Generate the plot object
visualize <- plot_join_plan(plan)
visualize
} else {
cat("Install the 'DiagrammeR' package to visualize the join plan.")
}
## ----execute_plan, message=TRUE-----------------------------------------------
# The executor runs the plan in a clean environment
final_dt <- execute_join_plan(plan, all_tables)
# Show the first few rows of the final, merged data.table
print(head(final_dt))
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.