ETL project provides means to:
safeLookup() function for example is an alternative to merge() which enforces safe left join.To access BigQuery you will need following environment variables:
#.Renviron
BIGQUERY_PROJECT # name of the project in BigQuery. Default project cannot be changed in the code.
BIGQUERY_DATASET # name of the default dataset in BigQuery. You can override default dataset in most functions.
BIGQUERY_ACCESS_TOKEN_PATH # path to the json token file.
BIGQUERY_LEGACY_SQL # query will be executed with legacy flavour if set to `TRUE`.
BigQuery functions wrap bigrquery functions to provide higher level API removing boilerplate instructions of the lower level API.
You can parameterise your SQL using positional matching (sprintf) if you don't name arguments in the call to bqExecuteQuery():
# Running query to get the size of group A in Legacy dialect
dt <- bqExecuteQuery("SELECT COUNT(*) as size FROM my_table WHERE group = `%1$s`", "A")
# You can also save template of the query in a file and get results like this
dt <- bqExecuteFile("group-size.sql", "A")
You can use parameters in the query template with standard SQL. You have to give matching names to arguments in bqExecuteQuery() call:
# Running query to get the size of group A
dt <- bqExecuteQuery(
sql = "SELECT COUNT(*) as size FROM my_table WHERE group = @group",
group = "A",
use.legacy.sql = FALSE
)
In the example above group argument will be matched with @group paramter and BigQuery will execute
the following query:
SELECT
COUNT(*) as size
FROM
my_table
WHERE
group = 'A' -- parameter is replaced by matching argument in the call
You can also pass array into query like this:
# Running query to get the size of groups A and B
dt <- bqExecuteQuery(
sql = "SELECT COUNT(*) as size FROM my_table WHERE group IN UNNEST(@group)",
group = c("A", "B"),
use.legacy.sql = FALSE
)
Sometimes you don't know in advance the length of the vector that will be passed into a query.
UNNEST in the query above is expecting array which is not created for scalars.
To avoid an error in dynamic scripts (shiny), you can enforce array with bq_param_array() like this:
# Running query to get the size of groups A and B
dt <- bqExecuteQuery(
sql = "SELECT COUNT(*) as size FROM my_table WHERE group IN UNNEST(@group)",
group = bq_param_array(input$groups),
use.legacy.sql = FALSE
)
# Check if default dataset exists
bqDatasetExists()
# Create dataset
bqCreateDataset("my_dataset")
# Drop dataset
bqDeleteDataset("my_dataset")
You can protect dataset from programmatic deletion by adding delete:never label (key:value) to it.
bqDownloadQuery() & bqDownloadQueryFile() allow downloads of large datasets through Storage.
You will need to provide environment variables to support GCS processing:
# .Renviron
GCS_AUTH_FILE= # path to json file with service token
GCS_BUCKET= # gcs bucket where tables will exported to
If your raw data is in daily partitioned tables you can transform
data into a new partitioned table with transformation defined in the
query template file. Date of each partition is passed into template as
first parameter in yyyymmdd format.
-- transformation_count.sql
SELECT COUNT(*) AS rows FROM my_table$%1$s
# etl.R
bqTransformPartition("my_new_table_1", "transformation_count.sql")
Range of dates that will be backfilled is limited between these envvars:
# .Renviron
BIGQUERY_START_DATE=
BIGQUERY_END_DATE=
# Initiate empty table from json schema
bqInitiateTable("new_talbe", "new_table_chema.json", partition = TRUE)
# Create table from results of a query
bqCreateTable("SELECT * FROM my_table", "my_table_2")
# Create table by uploading data.table
bqInsertData("my_table", cars)
bqInsertData("my_table", cars)
You can also use bqInsertLargeData() function.
This will load data into BigQuery table through the temp file in Google Cloud Storage:
# Note: you need to load googleCloudStorageR in your pipeline
# for authentication.
library(googleCloudStorageR)
bqInsertLargeData("my_table", cars)
Two additional environment variable are required for connection with Cloud Storage:
# .Renviron
GCS_AUTH_FILE= # path to json token file to access Cloud Storage
GCS_BUCKET= # bucket where temporary json file with data will be created
To access AWS S3 storage provide the following environment variables:
# .Renviron
AWS_ACCESS_KEY_ID # Access key
AWS_SECRET_ACCESS_KEY # Secret key
AWS_DEFAULT_REGION # Default region (e.g. `us-east-1`)
AWS_S3_BUCKET # Name of the S3 bucket
AWS_S3_ROOT # Root path, see examples bellow.
read data from csv file into data.table
# s3://{AWS_S3_BUCKET}/{AWS_S3_ROOT}/path/to/myfile.csv
dt <- s3GetFile("path/to/myfile.csv")
write data from data.table into a csv file
# s3://{AWS_S3_BUCKET}/{AWS_S3_ROOT}/path/to/myfile.csv
dt <- s3PutFile(dt, "path/to/myfile.csv")
s3GetFile and s3PutFile support the following file extentions:
s3GetData Allows to load data from many files that start with the same path
# s3://{AWS_S3_BUCKET}/{AWS_S3_ROOT}/path/to/myfile_1.csv
# s3://{AWS_S3_BUCKET}/{AWS_S3_ROOT}/path/to/myfile_2.csv
# s3://{AWS_S3_BUCKET}/{AWS_S3_ROOT}/path/to/myfile_3.csv
dt <- s3GetData("path/to/myfile_")
.Renviron file is needed should be configured to set up envars.
Here is a small example with two variables:
GCS_AUTH_FILE=access_token.json
GCS_BUCKET=my-bucket
Full details on environment in R seessions can be found here: help(Startup)
Retl is setup for automated tests on Travis.
Some of the tests relies on authentification to the GCP project madecom-test-retl using an encrypted token stored in inst/secret/retl-testing.json. The token is decrypted using a password that need to be stored in your .Renviron file (RETL_PASSWORD), which can be fetched in the vault (secret/bi/retl_password).
#.Renviron
BIGQUERY_TEST_PROJECT=madecom-test-retl
RETL_PASSWORD={FETCH VALUE FROM VAULT}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.