R/preprocess-usermatrix.R

Defines functions assign_cluster get_redshift_connection

##############################
# STAGE 1: SETUP             #              
##############################

library(tidyverse)
library(dbplyr)
library(rjson)
library(aws.secrets)
library(httr)
library(RPostgreSQL)
library(NbClust)
library(scales)
library(RJDBC)

# Connect to Redshift with AWS secrets
s3credentials <-content(GET("http://169.254.169.254/latest/meta-data/iam/security-credentials/live-r-server-r-server-ComponentRole-1P8OXESQT6GMR"))
s3credentials2 <- fromJSON(s3credentials)
token <- s3credentials2$Token
secret_access_key <- s3credentials2$SecretAccessKey
access_key_id <- s3credentials2$AccessKeyId

# Set AWS region
Sys.setenv("AWS_DEFAULT_REGION" = "eu-west-1")

# Load credentials from AWS
secret <- get_secret_value('servers/r_server/prod/credentials/scv_redshift')$SecretString
secret <- fromJSON(secret)

# Connect to Redshift
redshift <- src_postgres(host='live-idl-prod-redshift-component-redshiftcluster-1q6vyltqf8lth.ctm1v7db0ubd.eu-west-1.redshift.amazonaws.com', port='5439', 
                         dbname = 'redshiftdb',
                         user = secret$redshift_username, 
                         password = secret$redshift_password)

# Load core data
user_matrix <- dbGetQuery(redshift$con, "SELECT * FROM central_insights_sandbox.dh_user_matrix")

##############################
# STAGE 2: DATA PROCESSING   #             
##############################

# Save latent features
latent_features <- user_matrix %>% select(tidyselect::matches("f[0-9]")) %>% names()

# Select latent features
user_matrix_reduced <- user_matrix %>%
  select(tidyselect::matches("f[0-9]"))

# Set seed and take a sample
set.seed(1234)
user_matrix_reduced_sample <- user_matrix_reduced %>% 
  sample_n(10000)

# Cluster the sample
nbclust_k <- NbClust(data = user_matrix_reduced_sample,
                     method = "kmeans",
                     index = "silhouette",
                     min.nc = 5,
                     max.nc = 20)

# Extract optimum k
optimum_k <- nbclust_k$Best.nc[1]

# Fit the model using optimum k
kmodel <- kmeans(user_matrix_reduced_sample, centers = optimum_k, nstart = 25)

# Extract the cluster centres
cluster_centres <- kmodel$centers

# Write function to assign cluster based on proximity to cluster centres
assign_cluster <- function(user_scores) {
  
  distances <- as.matrix(dist(rbind(user_scores, cluster_centres)))
  distances <- unname(distances[2:nrow(distances), 1])
  
  assigned_cluster <- which.min(distances)
  return(assigned_cluster)
  
}

# Assign cluster
user_matrix_reduced$cluster <- apply(user_matrix_reduced, 1, assign_cluster)
user_matrix$cluster <- as.factor(user_matrix_reduced$cluster)

# Save cluster sizes
user_clustersizes <- user_matrix %>% count(cluster)

# Create aggregated data frame to reduced data load in Shiny app
user_aggbycluster <- user_matrix %>% 
  select(cluster, latent_features) %>% 
  group_by(cluster) %>% 
  summarise_all(mean) %>% 
  left_join(user_clustersizes) %>% 
  mutate(label = paste("Cluster ", cluster, ":\n", comma(n), " users", sep = ""))

user_aggbygender <- user_matrix %>% 
  filter(!is.na(gender)) %>% 
  group_by(cluster) %>% 
  count(gender) %>% 
  mutate(prop = n / sum(n))

user_aggbyage <- user_matrix %>% 
  filter(age_range != "unknown") %>% 
  group_by(cluster) %>% 
  count(age_range) %>% 
  mutate(prop = n / sum(n))

library(aws.s3)

s3_bucket <- "rstudio-input-output"

my_table <- user_matrix
colnames(my_table)[1] <- "user_id"

s3write_using(my_table, 
              FUN = write.csv,
              row.names = FALSE,
              bucket = s3_bucket, 
              object = "joshua_feldman/user_matrix")

db_coltypes <- sapply(my_table, class)

integers <- sapply(my_table, 
                   function(x){
                     # find numeric columns that are not integers:                   
                     ifelse(!is.numeric(x), FALSE, ifelse(all(x%%1==0), TRUE, FALSE))
                   } )

db_coltypes <- data.frame(names = names(db_coltypes),
                          type = unlist(db_coltypes),
                          integers = unlist(integers),
                          stringsAsFactors = FALSE)

db_coltypes <- db_coltypes %>% 
  # as order and column are reserved sql words, we can't have them as a variable name:
  mutate(names = ifelse(names == "order", "myorder", names),
         names = ifelse(names == "column", "mycolumn", names),
         # replace any non-alphanumeric character with underscore:
         names = gsub("[^[:alnum:] ]", "_", names)) %>% 
  mutate(aws_type = case_when(
    grepl("factor|character",type) ~ "VARCHAR(1000)",
    grepl("integer|numeric",type) & integers == TRUE ~ "INT", 
    grepl("integer|numeric",type) & integers == FALSE ~ "DECIMAL(10,2)" # alter depending on decimal places if any in data
  )) %>% 
  mutate(aws_type = ifelse(is.na(aws_type), "VARCHAR(1000)", aws_type)) %>% 
  mutate(var_def = paste(names, aws_type, sep = " "))

table_def <- paste(db_coltypes$var_def, collapse = ", ")

get_redshift_connection <- function(
  secret_location="servers/r_server/prod/credentials/scv_redshift",
  driver_location="/data/projects/_tools/RedshiftJDBC41-1.1.9.1009.jar",
  aws_region="eu-west-1"
) {
  # Set AWS region
  Sys.setenv("AWS_DEFAULT_REGION" = aws_region)
  
  # Get secret from AWS secrets manager
  secret <- get_secret_value(secret_location)$SecretString
  secret <- fromJSON(secret)
  
  # Connection string
  url <- stringr::str_interp("jdbc:redshift://${secret$redshift_host}:${secret$redshift_port}/${secret$redshift_dbname}?user=${secret$redshift_username}&password=${secret$redshift_password}")
  
  # Load driver
  driver <- JDBC("com.amazon.redshift.jdbc41.Driver", driver_location, identifier.quote="'")
  
  # Open and return connection
  return(dbConnect(driver, url))
}

conn <- get_redshift_connection()

# create table in redshift:
dbSendUpdate(conn, 
             paste0(
               "DROP TABLE IF EXISTS central_insights_sandbox.jf_user_matrix;
               CREATE TABLE central_insights_sandbox.jf_user_matrix (",  table_def , ");"
             )
)

# create the copy statement:
redshift_copy_statement <-
  paste0(
    "COPY central_insights_sandbox.jf_user_matrix FROM 's3://", s3_bucket,"/joshua_feldman/user_matrix' credentials 'aws_access_key_id=", access_key_id,
    ";aws_secret_access_key=", secret_access_key,
    ";token=", token,
    "' CSV DELIMITER ',' IGNOREHEADER 1 DATEFORMAT 'YYYY-MM-DD' ;"
  )

# excecute the copy statement:
dbSendUpdate(conn, redshift_copy_statement)

dbGetQuery(conn, "GRANT SELECT ON central_insights_sandbox.jf_user_matrix
           TO GROUP central_insights
           ;")

favourite_shows <- dbGetQuery(redshift$con, "
                              SELECT a.cluster, b.top_level_editorial_object, COUNT(DISTINCT b.audience_id)
                              FROM central_insights_sandbox.dh_user_matrix")


##############################
# STAGE 3: WRITE TO SERVER   #             
##############################

# Save appended data frame
saveRDS(user_aggbycluster, "/efs/shiny-server/latent-feature-explorer/user_aggbycluster.RDS")
saveRDS(user_aggbygender, "/efs/shiny-server/latent-feature-explorer/user_aggbygender.RDS")
saveRDS(user_aggbyage, "/efs/shiny-server/latent-feature-explorer/user_aggbyage.RDS")
bbc/insights-latent-feature-explorer documentation built on Nov. 3, 2019, 2:08 p.m.