### STEP 1: LOAD FUNCTIONS, CONFIG FILE, DEFINE DIRECTORY VARIABLES, AND CHECK CREDENTIALS
message("STEP 1: Loading Functions, Config File, Defining Variables, and Check SFTP Credentials...")
source("apcd_import_functions.R")
config <- yaml::read_yaml("apcd_import_config.yaml")
#Define directories for downloaded files and extracted files.
base_dir <- config$base_dir
ref_dir <- paste0(base_dir, "/ref_schema")
stage_dir <- paste0(base_dir, "/stage_schema")
final_dir <- paste0(base_dir, "/final_schema")
apcd_prep_check_f(config)
files <- data.frame()
memory.limit(size = 56000)
### STEP 2: REVIEW SFTP FILES AND CREATE ETL ENTRIES
message("STEP 2: Review SFTP Files and Create New ETL Entries")
message("Getting SFTP file list...")
files <- apcd_ftp_get_file_list_f(config)
message("Comparing current ETL log with SFTP file list...")
etl_list <- apcd_etl_get_list_f(config)
files <- files %>%
anti_join(etl_list, by = "file_name")
message("Create ETL entries for new SFTP files...")
if(nrow(files) > 0) {
for(f in 1:nrow(files)) {
files[f, "etl_id"] <- apcd_etl_entry_f(config,
file_name = files[f,]$file_name,
file_date = files[f,]$file_date,
file_schema = files[f,]$schema,
file_table = files[f,]$table,
file_number = files[f,]$file_number)
}
} else {
message("No new SFTP files on server...")
}
### STEP 3: CHOOSE SCHEMAS AND TABLES TO DOWNLOAD, THEN DOWNLOAD FILES
# Select which schemas and tables to download the files
etl_list <- apcd_etl_get_list_f(config)
if(!is.Date(files$file_date)) {
files$file_date <- as.Date(files$file_date)
}
if(!is.Date(etl_list$file_date)) {
etl_list$file_date <- as.Date(etl_list$file_date)
}
if(nrow(files) > 0) {
files <- files %>% left_join(etl_list) %>% filter(is.na(datetime_download))
} else {
files <- etl_list %>% filter(is.na(datetime_download))
}
if(nrow(files) > 0) {
files <- files %>% left_join(etl_list) %>% filter(is.na(datetime_download))
} else {
files <- etl_list %>% filter(is.na(datetime_download))
}
if(nrow(files) > 0) {
message(paste0("Begin Downloading ", nrow(files), " Files from SFTP..."))
for(f in 1:nrow(files)) {
message(paste0("...Downloading File: " , f, ": ", files[f, "file_name"], "..."))
if(files[f, "file_schema"] == "ref") {
files[f, "file_path"] <- ref_dir
} else if(files[f, "file_schema"] == "stage") {
files[f, "file_path"] <- stage_dir
} else {
files[f, "file_path"] <- final_dir
}
files[f, "file_path"] <- paste0(files[f, "file_path"], "/", files[f, "file_name"])
files[f, "datetime_download"] <- apcd_ftp_get_file_f(config,
file = files[f, ])
message(paste0("......Download Complete. ", nrow(files) - f, " of ", nrow(files), " left to download..."))
}
message("All Files Downloaded...")
} else {
message("No files to Download...")
}
### STEP 4: EXTRACT AND LOAD DATA FROM FILES INTO SQL
# Select which schemas and tables to import
etl_list <- apcd_etl_get_list_f(config)
files <- etl_list %>% filter(is.na(datetime_load)) %>% filter(!is.na(datetime_download))
if(nrow(files) > 0) {
message(paste0("Begin Loading ", nrow(files), " Files into SQL Server..."))
import_errors <- list()
for(f in 1:nrow(files)) {
message(paste0("...Loading File: " , f, ": ", files[f, "file_name"], "..."))
result <- apcd_data_load_f(config, file = files[f, ])
message(paste0("......Loading Complete. ", nrow(files) - f, " of ", nrow(files), " left to import..."))
if(!is.na(result)) {
import_errors <- append(import_errors, result)
}
}
message("All Files Loaded...")
if(length(import_errors) == 0) {
message("No errors to report...")
} else {
message(paste0("There were ", length(import_errors), " error(s):"))
for(x in 1:length(import_errors)) {
message(import_errors[x])
}
}
} else {
message("No files to Load...")
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.