knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) devtools::load_all()
```{python, include=FALSE} import os import pandas as pd import awswrangler as wr import pydbtools as pydb
foldername = "mratford" # GH username foldername = foldername.lower().replace("-","_")
bucketname = "alpha-everyone" s3_base_path = f"s3://{bucketname}/{foldername}/"
db_name = f"aws_example_dbtools" source_db_base_path = f"s3://{bucketname}/{foldername}/source_db/"
if wr.s3.list_objects(s3_base_path): print("deleting objs") wr.s3.delete_objects(s3_base_path)
df_dbs = wr.catalog.databases(None) if db_name in df_dbs["Database"].to_list(): print(f"{db_name} found deleting") wr.catalog.delete_database( name=db_name )
wr.catalog.create_database(db_name)
for table_name in ["department", "employees", "sales"]: df = pd.read_csv(f"data/{table_name}.csv") table_path = os.path.join(source_db_base_path, table_name) wr.s3.to_parquet( df=df, path=table_path, index=False, dataset=True, # True allows the other params below i.e. overwriting to db.table database=db_name, table=table_name, mode="overwrite", )
`dbtools` is a library used to query AWS Athena databases from R on the Ministry of Justice's Analytical Platform. It uses the Python library `pydbtools` and inherits much of its functionality, including creating and querying temporary tables and injecting SQL queries with template arguments. ```r library(dbtools)
The read_sql_query
function is used to obtain R dataframes from SQL queries
sent to Athena.
read_sql_query("select * from aws_example_dbtools.employees limit 5")
If a tibble is preferred the read_sql
function is provided
read_sql("select * from aws_example_dbtools.department limit 5", return_df_as="tibble")
or for a data.table
read_sql("select * from aws_example_dbtools.sales limit 5", return_df_as="data.table")
The create_temp_table
function allows you to create tables which can be
referred to in subsequent queries from the __temp__
database. For example,
to create a table showing total sales per employee from the tables above create
a temporary total sales table.
sql <- " SELECT employee_id, sum(sales) as total_sales FROM aws_example_dbtools.sales GROUP BY employee_id " create_temp_table(sql, table_name="total_sales")
Then create a table of employees from the sales department.
sql <- " SELECT e.employee_id, e.forename, e.surname, d.department_name FROM aws_example_dbtools.employees AS e LEFT JOIN aws_example_dbtools.department AS d ON e.department_id = d.department_id WHERE e.department_id = 1 " create_temp_table(sql, table_name="sales_employees")
The two temporary tables can then be joined to provide the final table.
sql <- " SELECT se.*, ts.total_sales FROM __temp__.sales_employees AS se INNER JOIN __temp__.total_sales AS ts ON se.employee_id = ts.employee_id " read_sql_query(sql)
Sometimes you will want to run similar SQL queries which differ only by, for example, table or column names. In these cases SQL templates can be created to SQL queries populated by templated variables, using Jinja2 templating (https://jinja2docs.readthedocs.io/en/stable/index.html). For example,
sql_template = "select * from {{ db_name }}.{{ table }} limit 10" sql <- render_sql_template(sql_template, list(db_name="aws_example_dbtools", table="department")) sql
The rendered SQL can then be used to query Athena as usual.
read_sql_query(sql)
The same template can be used to read a different table.
sql <- render_sql_template(sql_template, list(db_name="aws_example_dbtools", table="sales")) read_sql_query(sql)
Perhaps more usefully we can use SQL templates saved as a file, which means we can make use of our editors' and IDEs' SQL capabilities.
cat("SELECT * FROM {{ db_name }}.{{ table_name }}", file="tempfile.sql") sql <- get_sql_from_file("tempfile.sql", jinja_args=list(db_name="aws_example_dbtools", table_name="department")) read_sql_query(sql)
In this section we will create a new database from our existing database in
Athena. Use the start_query_execution_and_wait
function
to run the SQL creating the database.
sql <- " CREATE DATABASE IF NOT EXISTS new_db_dbtools COMMENT 'Example of running queries and insert into' LOCATION 's3://alpha-everyone/dbtools/new_db/' " response <- start_query_execution_and_wait(sql) response$Status$State
Create a derived table in the new database with a CTAS query that both generates the output into S3 and creates the schema of the table. Note that this only inserts the data from quarters 1 and 2.
sql <- " CREATE TABLE new_db_dbtools.sales_report WITH ( external_location='s3://alpha-everyone/dbtools/new_db/sales_report' ) AS SELECT qtr as sales_quarter, sum(sales) AS total_sales FROM aws_example_dbtools.sales WHERE qtr IN (1,2) GROUP BY qtr " response <- start_query_execution_and_wait(sql) response$Status$State
We can now use an insert into query to add the data from quarters 3 and 4 as the schema has already been created.
sql <- " INSERT INTO new_db_dbtools.sales_report SELECT qtr as sales_quarter, sum(sales) AS total_sales FROM aws_example_dbtools.sales WHERE qtr IN (3,4) GROUP BY qtr " response <- start_query_execution_and_wait(sql) read_sql_query("select * from new_db_dbtools.sales_report")
Do the same as before but partition the data based on when the report was run. This can make queries more efficient as filtering on the partition columns reduces the amount of data scanned, plus makes incrementally adding data easier.
sql <- " CREATE TABLE new_db_dbtools.daily_sales_report WITH ( external_location='s3://alpha-everyone/dbtools/new_db/daily_sales_report', partitioned_by = ARRAY['report_date'] ) AS SELECT qtr as sales_quarter, sum(sales) AS total_sales, date '2021-01-01' AS report_date FROM aws_example_dbtools.sales GROUP BY qtr, date '2021-01-01' " response <- start_query_execution_and_wait(sql) response$Status$State
Then, simulating a source database that is updated daily, add more partitioned data.
sql <- " INSERT INTO new_db_dbtools.daily_sales_report SELECT qtr as sales_quarter, sum(sales) AS total_sales, date '2021-01-02' AS report_date FROM aws_example_dbtools.sales GROUP BY qtr, date '2021-01-02' " response <- start_query_execution_and_wait(sql) read_sql_query("select * from new_db_dbtools.daily_sales_report")
We can remove a partition and its underlying data using
delete_partitions_and_data
which uses an expression to match partitions -
see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions
for more details.
delete_partitions_and_data("new_db_dbtools", "daily_sales_report", "report_date = '2021-01-02'") read_sql_query("select * from new_db_dbtools.daily_sales_report")
Similarly we can remove a table and its data,
delete_table_and_data("new_db_dbtools", "daily_sales_report")
or the whole database.
delete_database_and_data("new_db_dbtools")
delete_database_and_data("aws_example_dbtools")
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.