ETL project provides means to:
safeLookup()
function for example is an alternative to merge()
which enforces safe left join.To access BigQuery you will need following environment variables:
#.Renviron
BIGQUERY_PROJECT # name of the project in BigQuery. Default project cannot be changed in the code.
BIGQUERY_DATASET # name of the default dataset in BigQuery. You can override default dataset in most functions.
BIGQUERY_ACCESS_TOKEN_PATH # path to the json token file.
BIGQUERY_LEGACY_SQL # query will be executed with legacy flavour if set to `TRUE`.
BigQuery functions wrap bigrquery
functions to provide higher level API removing boilerplate instructions of the lower level API.
You can parameterise your SQL using positional matching (sprintf
) if you don't name arguments in the call to bqExecuteQuery()
:
# Running query to get the size of group A in Legacy dialect
dt <- bqExecuteQuery("SELECT COUNT(*) as size FROM my_table WHERE group = `%1$s`", "A")
# You can also save template of the query in a file and get results like this
dt <- bqExecuteFile("group-size.sql", "A")
You can use parameters in the query template with standard SQL. You have to give matching names to arguments in bqExecuteQuery()
call:
# Running query to get the size of group A
dt <- bqExecuteQuery(
sql = "SELECT COUNT(*) as size FROM my_table WHERE group = @group",
group = "A",
use.legacy.sql = FALSE
)
In the example above group
argument will be matched with @group
paramter and BigQuery will execute
the following query:
SELECT
COUNT(*) as size
FROM
my_table
WHERE
group = 'A' -- parameter is replaced by matching argument in the call
You can also pass array into query like this:
# Running query to get the size of groups A and B
dt <- bqExecuteQuery(
sql = "SELECT COUNT(*) as size FROM my_table WHERE group IN UNNEST(@group)",
group = c("A", "B"),
use.legacy.sql = FALSE
)
Sometimes you don't know in advance the length of the vector that will be passed into a query.
UNNEST
in the query above is expecting array which is not created for scalars.
To avoid an error in dynamic scripts (shiny), you can enforce array with bq_param_array()
like this:
# Running query to get the size of groups A and B
dt <- bqExecuteQuery(
sql = "SELECT COUNT(*) as size FROM my_table WHERE group IN UNNEST(@group)",
group = bq_param_array(input$groups),
use.legacy.sql = FALSE
)
# Check if default dataset exists
bqDatasetExists()
# Create dataset
bqCreateDataset("my_dataset")
# Drop dataset
bqDeleteDataset("my_dataset")
You can protect dataset from programmatic deletion by adding delete:never
label (key:value) to it.
bqDownloadQuery()
& bqDownloadQueryFile()
allow downloads of large datasets through Storage.
You will need to provide environment variables to support GCS processing:
# .Renviron
GCS_AUTH_FILE= # path to json file with service token
GCS_BUCKET= # gcs bucket where tables will exported to
If your raw data is in daily partitioned tables you can transform
data into a new partitioned table with transformation defined in the
query template file. Date of each partition is passed into template as
first parameter in yyyymmdd
format.
-- transformation_count.sql
SELECT COUNT(*) AS rows FROM my_table$%1$s
# etl.R
bqTransformPartition("my_new_table_1", "transformation_count.sql")
Range of dates that will be backfilled is limited between these envvars:
# .Renviron
BIGQUERY_START_DATE=
BIGQUERY_END_DATE=
# Initiate empty table from json schema
bqInitiateTable("new_talbe", "new_table_chema.json", partition = TRUE)
# Create table from results of a query
bqCreateTable("SELECT * FROM my_table", "my_table_2")
# Create table by uploading data.table
bqInsertData("my_table", cars)
bqInsertData("my_table", cars)
You can also use bqInsertLargeData()
function.
This will load data into BigQuery table through the temp file in Google Cloud Storage:
# Note: you need to load googleCloudStorageR in your pipeline
# for authentication.
library(googleCloudStorageR)
bqInsertLargeData("my_table", cars)
Two additional environment variable are required for connection with Cloud Storage:
# .Renviron
GCS_AUTH_FILE= # path to json token file to access Cloud Storage
GCS_BUCKET= # bucket where temporary json file with data will be created
To access AWS S3 storage provide the following environment variables:
# .Renviron
AWS_ACCESS_KEY_ID # Access key
AWS_SECRET_ACCESS_KEY # Secret key
AWS_DEFAULT_REGION # Default region (e.g. `us-east-1`)
AWS_S3_BUCKET # Name of the S3 bucket
AWS_S3_ROOT # Root path, see examples bellow.
read data from csv file into data.table
# s3://{AWS_S3_BUCKET}/{AWS_S3_ROOT}/path/to/myfile.csv
dt <- s3GetFile("path/to/myfile.csv")
write data from data.table into a csv file
# s3://{AWS_S3_BUCKET}/{AWS_S3_ROOT}/path/to/myfile.csv
dt <- s3PutFile(dt, "path/to/myfile.csv")
s3GetFile
and s3PutFile
support the following file extentions:
s3GetData
Allows to load data from many files that start with the same path
# s3://{AWS_S3_BUCKET}/{AWS_S3_ROOT}/path/to/myfile_1.csv
# s3://{AWS_S3_BUCKET}/{AWS_S3_ROOT}/path/to/myfile_2.csv
# s3://{AWS_S3_BUCKET}/{AWS_S3_ROOT}/path/to/myfile_3.csv
dt <- s3GetData("path/to/myfile_")
.Renviron
file is needed should be configured to set up envars.
Here is a small example with two variables:
GCS_AUTH_FILE=access_token.json
GCS_BUCKET=my-bucket
Full details on environment in R seessions can be found here: help(Startup)
Retl is setup for automated tests on Travis.
Some of the tests relies on authentification to the GCP project madecom-test-retl
using an encrypted token stored in inst/secret/retl-testing.json
. The token is decrypted using a password that need to be stored in your .Renviron
file (RETL_PASSWORD
), which can be fetched in the vault (secret/bi/retl_password
).
#.Renviron
BIGQUERY_TEST_PROJECT=madecom-test-retl
RETL_PASSWORD={FETCH VALUE FROM VAULT}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.