##############################
# STAGE 1: SETUP #
##############################
library(tidyverse)
library(dbplyr)
library(rjson)
library(aws.secrets)
library(httr)
library(RPostgreSQL)
library(NbClust)
library(scales)
library(RJDBC)
# Connect to Redshift with AWS secrets
s3credentials <-content(GET("http://169.254.169.254/latest/meta-data/iam/security-credentials/live-r-server-r-server-ComponentRole-1P8OXESQT6GMR"))
s3credentials2 <- fromJSON(s3credentials)
token <- s3credentials2$Token
secret_access_key <- s3credentials2$SecretAccessKey
access_key_id <- s3credentials2$AccessKeyId
# Set AWS region
Sys.setenv("AWS_DEFAULT_REGION" = "eu-west-1")
# Load credentials from AWS
secret <- get_secret_value('servers/r_server/prod/credentials/scv_redshift')$SecretString
secret <- fromJSON(secret)
# Connect to Redshift
redshift <- src_postgres(host='live-idl-prod-redshift-component-redshiftcluster-1q6vyltqf8lth.ctm1v7db0ubd.eu-west-1.redshift.amazonaws.com', port='5439',
dbname = 'redshiftdb',
user = secret$redshift_username,
password = secret$redshift_password)
# Load core data
user_matrix <- dbGetQuery(redshift$con, "SELECT * FROM central_insights_sandbox.dh_user_matrix")
##############################
# STAGE 2: DATA PROCESSING #
##############################
# Save latent features
latent_features <- user_matrix %>% select(tidyselect::matches("f[0-9]")) %>% names()
# Select latent features
user_matrix_reduced <- user_matrix %>%
select(tidyselect::matches("f[0-9]"))
# Set seed and take a sample
set.seed(1234)
user_matrix_reduced_sample <- user_matrix_reduced %>%
sample_n(10000)
# Cluster the sample
nbclust_k <- NbClust(data = user_matrix_reduced_sample,
method = "kmeans",
index = "silhouette",
min.nc = 5,
max.nc = 20)
# Extract optimum k
optimum_k <- nbclust_k$Best.nc[1]
# Fit the model using optimum k
kmodel <- kmeans(user_matrix_reduced_sample, centers = optimum_k, nstart = 25)
# Extract the cluster centres
cluster_centres <- kmodel$centers
# Write function to assign cluster based on proximity to cluster centres
assign_cluster <- function(user_scores) {
distances <- as.matrix(dist(rbind(user_scores, cluster_centres)))
distances <- unname(distances[2:nrow(distances), 1])
assigned_cluster <- which.min(distances)
return(assigned_cluster)
}
# Assign cluster
user_matrix_reduced$cluster <- apply(user_matrix_reduced, 1, assign_cluster)
user_matrix$cluster <- as.factor(user_matrix_reduced$cluster)
# Save cluster sizes
user_clustersizes <- user_matrix %>% count(cluster)
# Create aggregated data frame to reduced data load in Shiny app
user_aggbycluster <- user_matrix %>%
select(cluster, latent_features) %>%
group_by(cluster) %>%
summarise_all(mean) %>%
left_join(user_clustersizes) %>%
mutate(label = paste("Cluster ", cluster, ":\n", comma(n), " users", sep = ""))
user_aggbygender <- user_matrix %>%
filter(!is.na(gender)) %>%
group_by(cluster) %>%
count(gender) %>%
mutate(prop = n / sum(n))
user_aggbyage <- user_matrix %>%
filter(age_range != "unknown") %>%
group_by(cluster) %>%
count(age_range) %>%
mutate(prop = n / sum(n))
library(aws.s3)
s3_bucket <- "rstudio-input-output"
my_table <- user_matrix
colnames(my_table)[1] <- "user_id"
s3write_using(my_table,
FUN = write.csv,
row.names = FALSE,
bucket = s3_bucket,
object = "joshua_feldman/user_matrix")
db_coltypes <- sapply(my_table, class)
integers <- sapply(my_table,
function(x){
# find numeric columns that are not integers:
ifelse(!is.numeric(x), FALSE, ifelse(all(x%%1==0), TRUE, FALSE))
} )
db_coltypes <- data.frame(names = names(db_coltypes),
type = unlist(db_coltypes),
integers = unlist(integers),
stringsAsFactors = FALSE)
db_coltypes <- db_coltypes %>%
# as order and column are reserved sql words, we can't have them as a variable name:
mutate(names = ifelse(names == "order", "myorder", names),
names = ifelse(names == "column", "mycolumn", names),
# replace any non-alphanumeric character with underscore:
names = gsub("[^[:alnum:] ]", "_", names)) %>%
mutate(aws_type = case_when(
grepl("factor|character",type) ~ "VARCHAR(1000)",
grepl("integer|numeric",type) & integers == TRUE ~ "INT",
grepl("integer|numeric",type) & integers == FALSE ~ "DECIMAL(10,2)" # alter depending on decimal places if any in data
)) %>%
mutate(aws_type = ifelse(is.na(aws_type), "VARCHAR(1000)", aws_type)) %>%
mutate(var_def = paste(names, aws_type, sep = " "))
table_def <- paste(db_coltypes$var_def, collapse = ", ")
get_redshift_connection <- function(
secret_location="servers/r_server/prod/credentials/scv_redshift",
driver_location="/data/projects/_tools/RedshiftJDBC41-1.1.9.1009.jar",
aws_region="eu-west-1"
) {
# Set AWS region
Sys.setenv("AWS_DEFAULT_REGION" = aws_region)
# Get secret from AWS secrets manager
secret <- get_secret_value(secret_location)$SecretString
secret <- fromJSON(secret)
# Connection string
url <- stringr::str_interp("jdbc:redshift://${secret$redshift_host}:${secret$redshift_port}/${secret$redshift_dbname}?user=${secret$redshift_username}&password=${secret$redshift_password}")
# Load driver
driver <- JDBC("com.amazon.redshift.jdbc41.Driver", driver_location, identifier.quote="'")
# Open and return connection
return(dbConnect(driver, url))
}
conn <- get_redshift_connection()
# create table in redshift:
dbSendUpdate(conn,
paste0(
"DROP TABLE IF EXISTS central_insights_sandbox.jf_user_matrix;
CREATE TABLE central_insights_sandbox.jf_user_matrix (", table_def , ");"
)
)
# create the copy statement:
redshift_copy_statement <-
paste0(
"COPY central_insights_sandbox.jf_user_matrix FROM 's3://", s3_bucket,"/joshua_feldman/user_matrix' credentials 'aws_access_key_id=", access_key_id,
";aws_secret_access_key=", secret_access_key,
";token=", token,
"' CSV DELIMITER ',' IGNOREHEADER 1 DATEFORMAT 'YYYY-MM-DD' ;"
)
# excecute the copy statement:
dbSendUpdate(conn, redshift_copy_statement)
dbGetQuery(conn, "GRANT SELECT ON central_insights_sandbox.jf_user_matrix
TO GROUP central_insights
;")
favourite_shows <- dbGetQuery(redshift$con, "
SELECT a.cluster, b.top_level_editorial_object, COUNT(DISTINCT b.audience_id)
FROM central_insights_sandbox.dh_user_matrix")
##############################
# STAGE 3: WRITE TO SERVER #
##############################
# Save appended data frame
saveRDS(user_aggbycluster, "/efs/shiny-server/latent-feature-explorer/user_aggbycluster.RDS")
saveRDS(user_aggbygender, "/efs/shiny-server/latent-feature-explorer/user_aggbygender.RDS")
saveRDS(user_aggbyage, "/efs/shiny-server/latent-feature-explorer/user_aggbyage.RDS")
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.