knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)
devtools::load_all()

```{python, include=FALSE} import os import pandas as pd import awswrangler as wr import pydbtools as pydb

setup your own testing area (set foldername = GH username)

foldername = "mratford" # GH username foldername = foldername.lower().replace("-","_")

bucketname = "alpha-everyone" s3_base_path = f"s3://{bucketname}/{foldername}/"

db_name = f"aws_example_dbtools" source_db_base_path = f"s3://{bucketname}/{foldername}/source_db/"

Delete all the s3 files in a given path

if wr.s3.list_objects(s3_base_path): print("deleting objs") wr.s3.delete_objects(s3_base_path)

Delete the database if it exists

df_dbs = wr.catalog.databases(None) if db_name in df_dbs["Database"].to_list(): print(f"{db_name} found deleting") wr.catalog.delete_database( name=db_name )

Setup source database

Create the database

wr.catalog.create_database(db_name)

Iterate through the tables in data/ and write them to our db using awswrangler

for table_name in ["department", "employees", "sales"]: df = pd.read_csv(f"data/{table_name}.csv") table_path = os.path.join(source_db_base_path, table_name) wr.s3.to_parquet( df=df, path=table_path, index=False, dataset=True, # True allows the other params below i.e. overwriting to db.table database=db_name, table=table_name, mode="overwrite", )

`dbtools` is a library used to query AWS Athena databases from R on the 
Ministry of Justice's Analytical Platform. It uses the Python library
`pydbtools` and inherits much of its functionality, including creating
and querying temporary tables and injecting SQL queries
with template arguments.

```r
library(dbtools)

Reading SQL queries

The read_sql_query function is used to obtain R dataframes from SQL queries sent to Athena.

read_sql_query("select * from aws_example_dbtools.employees limit 5")

If a tibble is preferred the read_sql function is provided

read_sql("select * from aws_example_dbtools.department limit 5",
         return_df_as="tibble")

or for a data.table

read_sql("select * from aws_example_dbtools.sales limit 5",
         return_df_as="data.table")

Creating temporary SQL tables

The create_temp_table function allows you to create tables which can be referred to in subsequent queries from the __temp__ database. For example, to create a table showing total sales per employee from the tables above create a temporary total sales table.

sql <- "
SELECT employee_id, sum(sales) as total_sales
FROM aws_example_dbtools.sales
GROUP BY employee_id
"
create_temp_table(sql, table_name="total_sales")

Then create a table of employees from the sales department.

sql <- "
SELECT e.employee_id, e.forename, e.surname, d.department_name
FROM aws_example_dbtools.employees AS e
LEFT JOIN aws_example_dbtools.department AS d
ON e.department_id = d.department_id
WHERE e.department_id = 1
"
create_temp_table(sql, table_name="sales_employees")

The two temporary tables can then be joined to provide the final table.

sql <- "
SELECT se.*, ts.total_sales
FROM __temp__.sales_employees AS se
INNER JOIN __temp__.total_sales AS ts
ON se.employee_id = ts.employee_id
"
read_sql_query(sql)

SQL templating

Sometimes you will want to run similar SQL queries which differ only by, for example, table or column names. In these cases SQL templates can be created to SQL queries populated by templated variables, using Jinja2 templating (https://jinja2docs.readthedocs.io/en/stable/index.html). For example,

sql_template = "select * from {{ db_name }}.{{ table }} limit 10"
sql <- render_sql_template(sql_template, 
                           list(db_name="aws_example_dbtools",
                                table="department"))
sql

The rendered SQL can then be used to query Athena as usual.

read_sql_query(sql)

The same template can be used to read a different table.

sql <- render_sql_template(sql_template, 
                           list(db_name="aws_example_dbtools",
                                table="sales"))
read_sql_query(sql)

Perhaps more usefully we can use SQL templates saved as a file, which means we can make use of our editors' and IDEs' SQL capabilities.

cat("SELECT * FROM {{ db_name }}.{{ table_name }}", file="tempfile.sql")

sql <- get_sql_from_file("tempfile.sql",
                         jinja_args=list(db_name="aws_example_dbtools",
                                         table_name="department"))
read_sql_query(sql)

Advanced usage

Creating and maintaining database tables in Athena

In this section we will create a new database from our existing database in Athena. Use the start_query_execution_and_wait function to run the SQL creating the database.

sql <- "
CREATE DATABASE IF NOT EXISTS new_db_dbtools
COMMENT 'Example of running queries and insert into'
LOCATION 's3://alpha-everyone/dbtools/new_db/'
"

response <- start_query_execution_and_wait(sql)
response$Status$State

Create a derived table in the new database with a CTAS query that both generates the output into S3 and creates the schema of the table. Note that this only inserts the data from quarters 1 and 2.

sql <- "
CREATE TABLE new_db_dbtools.sales_report WITH
(
    external_location='s3://alpha-everyone/dbtools/new_db/sales_report'
) AS
SELECT qtr as sales_quarter, sum(sales) AS total_sales
FROM aws_example_dbtools.sales
WHERE qtr IN (1,2)
GROUP BY qtr
"

response <- start_query_execution_and_wait(sql)
response$Status$State

We can now use an insert into query to add the data from quarters 3 and 4 as the schema has already been created.

sql <- "
INSERT INTO new_db_dbtools.sales_report
SELECT qtr as sales_quarter, sum(sales) AS total_sales
FROM aws_example_dbtools.sales
WHERE qtr IN (3,4)
GROUP BY qtr
"

response <- start_query_execution_and_wait(sql)
read_sql_query("select * from new_db_dbtools.sales_report") 

Creating a table with partitions

Do the same as before but partition the data based on when the report was run. This can make queries more efficient as filtering on the partition columns reduces the amount of data scanned, plus makes incrementally adding data easier.

sql <- "
CREATE TABLE new_db_dbtools.daily_sales_report WITH
(
    external_location='s3://alpha-everyone/dbtools/new_db/daily_sales_report',
    partitioned_by = ARRAY['report_date']
) AS
SELECT qtr as sales_quarter, sum(sales) AS total_sales,
date '2021-01-01' AS report_date
FROM aws_example_dbtools.sales
GROUP BY qtr, date '2021-01-01'
"

response <- start_query_execution_and_wait(sql)
response$Status$State

Then, simulating a source database that is updated daily, add more partitioned data.

sql <- "
INSERT INTO new_db_dbtools.daily_sales_report
SELECT qtr as sales_quarter, sum(sales) AS total_sales,
date '2021-01-02' AS report_date
FROM aws_example_dbtools.sales
GROUP BY qtr, date '2021-01-02'
"

response <- start_query_execution_and_wait(sql)
read_sql_query("select * from new_db_dbtools.daily_sales_report")

We can remove a partition and its underlying data using delete_partitions_and_data which uses an expression to match partitions - see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions for more details.

delete_partitions_and_data("new_db_dbtools", "daily_sales_report",
                           "report_date = '2021-01-02'")
read_sql_query("select * from new_db_dbtools.daily_sales_report")

Similarly we can remove a table and its data,

delete_table_and_data("new_db_dbtools", "daily_sales_report")

or the whole database.

delete_database_and_data("new_db_dbtools")
delete_database_and_data("aws_example_dbtools")


moj-analytical-services/dbtools documentation built on Sept. 29, 2024, 2:06 a.m.