claims_db/phclaims/stage/tables/load_stage.mcare_elig_demo.R

# Header ####
  # Author: Danny Colombara
  # Date: August 28, 2019
  # Purpose: Create stage.mcare_elig_demo for SQL

## Set up R Environment ----
  rm(list=ls())  # clear memory
  pacman::p_load(data.table, odbc, DBI, tidyr, lubridate) # load packages
  options("scipen"=999) # turn off scientific notation  
  options(warning.length = 8170) # get lengthy warnings, needed for SQL
  
  start.time <- Sys.time()
  
  yaml.url <- "https://raw.githubusercontent.com/PHSKC-APDE/claims_data/main/claims_db/phclaims/stage/tables/load_stage.mcare_elig_demo.yaml"
  
## (1) Connect to SQL Server ----    
  db_claims <- dbConnect(odbc(), "PHClaims51")   
  
## (2) Load data from SQL ----  
  mbsf <- setDT(odbc::dbGetQuery(db_claims, "SELECT bene_id, bene_birth_dt, bene_death_dt, sex_ident_cd, bene_enrollmt_ref_yr, rti_race_cd,zip_cd FROM PHClaims.stage.mcare_mbsf"))
  setnames(mbsf, names(mbsf), c("id_mcare", "dob", "death_dt", "sex", "year", "race", "zip_code"))  
  
## (3) Create date of birth indicator as a data.table ----
  # Want to keep the most recent date b/c assume it is a correction in Medicare registration ... discussed with Eli/Alastair
  dob <- unique(mbsf[, c("id_mcare", "dob", "year")])
  setorder(dob, id_mcare, -year) # sorts by id_mcare with year is descending order
  dob[, most.recent := seq(1:.N)==1, by = "id_mcare"] # identify the row for the most recent year
  dob <- dob[most.recent == T] # keep only the most recent row
  dob <- dob[, c("most.recent", "year"):=NULL] # delete columns no longer of use   
  if(nrow(dob) - length(unique(dob$id_mcare)) != 0){
    stop('non-unique id_mcare in dob')
  } # confirm all id_mcare are unique
  
## (4) Create King County ever indicator (moment by moment KC status will be in timevar table) ----
  kc.zips <- fread("https://raw.githubusercontent.com/PHSKC-APDE/reference-data/main/spatial_data/zip_admin.csv") #github file with KC zipcodes
  kc <- copy(mbsf[, c("id_mcare", "zip_code", "year")])
  kc[, geo_kc_ever := as.numeric(suppressWarnings(as.integer(zip_code)) %in% kc.zips$zip)] #creating the new var kc (a 0/1 var) 
  kc <- unique(kc[, c("id_mcare", "geo_kc_ever", "year")])
  kc[, geo_kc_ever := sum(geo_kc_ever, na.rm = T), by = id_mcare] # sum # of times zip was in KC
  kc[geo_kc_ever>1, geo_kc_ever := 1]# geo_kc_ever should be either 0 | 1, so if >1, replace with 1
  kc <- unique(kc[, .(id_mcare, geo_kc_ever)])
  if(nrow(kc) - length(unique(kc$id_mcare)) != 0){
    stop('non-unique id_mcare in kc')
  }  # confirm all id_mcare are unique
  rm(kc.zips)

## (5) Create sex indicator as a data.table ----
  sex <- unique(mbsf[, .(id_mcare, sex, year)])
  setorder(sex, id_mcare, -year) # sort so most recent year is first for each id
  # identify the most recent gender for gender_recent
      sex.recent <- copy(sex)
      sex.recent[, ordering := 1:.N, by = id_mcare] # identify the most recent row for each id  
      sex.recent <- sex.recent[ordering == 1, ]
      sex.recent[sex==0, gender_recent := "Unknown"][sex==1, gender_recent := "Male"][sex==2, gender_recent := "Female"]
      sex.recent[, c("sex", "year", "ordering") := NULL]
  # back to processing all the sex data
      sex[, gender_female:=0][sex==2, gender_female := 1] # identify females
      sex[, gender_male:=0][sex==1, gender_male := 1] # identify males
      sex <- sex[, lapply(.SD, sum, na.rm = TRUE), by = id_mcare, .SDcols = c("gender_female", "gender_male")] # sum/collapse/aggregate the sex variables by ID
      sex[gender_female>1, gender_female := 1][gender_male>1, gender_male := 1] # female and male should be either 0 | 1, so if >1, replace with 1.
      sex[gender_female==1 & gender_male == 0, gender_me := "Female"][gender_female==0 & gender_male == 1, gender_me := "Male"]
      sex[gender_female==0 & gender_male == 0, gender_me := "Unknown"][gender_female==1 & gender_male == 1, gender_me := "Multiple"]
      sex[, gender_female_t := NA_integer_] # in mcaid, this is the percent of time data show female. However, non-sensical for mcare
      sex[, gender_male_t := NA_integer_] # in mcaid, this is the percent of time data show male. We decided not to do this for Medicare.
  # merge on gender_recent
      sex <- merge(sex, sex.recent, by = "id_mcare", all = TRUE)
      rm(sex.recent)
  # confirm all id_mcare are unique
      if(nrow(sex) - length(unique(sex$id_mcare)) != 0){
        stop('non-unique id_mcare in sex')
      } 

## (6) Create race indicator as a data.table ----
  #For race, use RTI_RACE_CD rather than BENE_RACE_CD because better allocation of Hipanic and Asian
  # Prep race 
      race <- unique(mbsf[, c("id_mcare", "race", "year")])
      setorder(race, id_mcare, -year) # sort so most recent year is first for each id
      race <- race[!is.na(id_mcare)]
      race[, race_asian:=NA] # RTI method cannot be used to identify Asian
      race[, race_nhpi:=NA] # RTI method cannot be used to identify NHPI
      race[race==0, race_unk := 1] 
      race[race==1, race_white := 1] 
      race[race==2, race_black := 1] 
      race[race==3, race_other := 1] 
      race[race==4, race_asian_pi := 1] # RTI method groups asian and pacific islander, so there is no way to split them in our data 
      race[race==5, race_latino := 1] # Latino is treated as race in RTI/Medicare data. No way to use as ethnicity.
      race[race==6, race_aian := 1] 
  # identify the most recent race for race_recent
      race.recent <- copy(race)
      race.recent[, ordering := 1:.N, by = id_mcare] # identify the most recent row for each id  
      race.recent <- race.recent[ordering == 1, ]
      race.recent[, race_eth_recent := as.character(factor(race, levels = c(0:6), labels = c("Unknown", "White", "Black", "Other", "Asian_PI", "Latino", "AI/AN")))]
      race.recent <- race.recent[, .(id_mcare, race_eth_recent)]
      race.recent[, race_recent := NA_character_] # for medicare cannot separate Latino from race
  # back to processing all the race data
    # sum/collapse/aggregate the race variables by ID
      race.vars <- paste0("race_", c("asian", "nhpi", "unk", "white", "black", "other", "asian_pi", "latino", "aian"))
      race <- race[, lapply(.SD, sum, na.rm = TRUE), by = id_mcare, .SDcols = race.vars] 
    # if collapsed data sums >1, replace with 1
      for(var in race.vars){
        race[get(var) > 1, paste(var) := 1] 
      }
    # create race_ethnicity var
      race[race_unk==1, race_eth_me := "Unknown"]
      race[race_white==1, race_eth_me := "White"]
      race[race_black==1, race_eth_me := "Black"]
      race[race_other==1, race_eth_me := "Other"]
      race[race_asian_pi==1, race_eth_me := "Asian_PI"]
      race[race_latino==1, race_eth_me := "Latino"]
      race[race_aian==1, race_eth_me := "AI/AN"]
      race[(race_white + race_black + race_other + race_asian_pi + race_latino + race_aian + race_unk) > 1, race_eth_me := "Multiple"]
    # create race_me
      race[, race_me := NA_character_] # this cannot be calculated b/c we only use RTI race coding, which includes Hispanic as a race  
  # merge on race_recent
      race <- merge(race, race.recent, by = "id_mcare", all = TRUE)
      rm(race.recent)      

  # create indicators for percent time as race (just to copy mcaid format)
    race[, c("race_white_t", "race_black_t", "race_asian_t", "race_aian_t", "race_asian_pi_t", "race_nhpi_t", "race_other_t", "race_latino_t", "race_unk_t") := NA_integer_] # Unlike Medicaid, we decided not to do this for Medicare
  # confirm all id_mcare are unique
      if(nrow(race) - length(unique(race$id_mcare)) != 0){
        stop('non-unique id_mcare in race')
      } 
    
  # ensure that Asian and NHPI are NA (because cannot be assessed in Medicare data)
    race[, c("race_nhpi", "race_asian") := NA]

## (7) Create date of death as a data.table ----
  death <- unique(mbsf[!is.na(death_dt) & death_dt != "1900-01-01", .(id_mcare, death_dt, year)]) # copy only death data
  setorder(death, id_mcare, -year) # order by year so that when drop duplicate id_mcare below, will keep the most recent death date, which will be assumed to be a correction
  death[, dup := 1:.N, by = "id_mcare"] # identify duplicates by id_mcare
  death <- death[dup == 1, ] # keep only the most recent death date (presumably a correction)
  death <- death[, .(id_mcare, death_dt)]
  death[, death_dt := as.Date(death_dt)]
  if(nrow(death) != length(unique(death$id_mcare))){stop("Repeated id_mcare in death data ... FIX before moving on!")}
  
## (8) Merge all data.tables ----
  # tidy mbsf
  elig <- unique(mbsf[, "id_mcare"])
  # add on dob
  elig <- merge(elig, dob, by = "id_mcare", all = TRUE)
  # add on King County ever indicator
  elig <- merge(elig, kc, by = "id_mcare", all = TRUE)  
  # add on gender
  elig <- merge(elig, sex, by = "id_mcare", all = TRUE)
  # add on race
  elig <- merge(elig, race, by = "id_mcare", all = TRUE)
  # add on death 
  elig <- merge(elig, death, by = "id_mcare", all = TRUE)
  
## (9) Add time stamp ----
  elig[, last_run := Sys.time()]  

## (10) Write to SQL ----              
  # set elig columns to proper type 
    elig[, dob := as.Date(dob)]

  # Pull YAML from GitHub
    table_config <- yaml::yaml.load(httr::GET(yaml.url))
  
  # Create table ID
    tbl_id <- DBI::Id(schema = table_config$schema, 
                      table = table_config$table)  
  
  # Ensure columns are in same order in R & SQL
    setcolorder(elig, names(table_config$vars))
  
  # Write table to SQL
    dbWriteTable(db_claims, 
                 tbl_id, 
                 value = as.data.frame(elig),
                 overwrite = T, append = F, 
                 field.types = unlist(table_config$vars))

## (11) Simple QA ----
    # Confirm that all rows were loaded to SQL ----
      stage.count <- as.numeric(odbc::dbGetQuery(db_claims, "SELECT COUNT (*) FROM stage.mcare_elig_demo"))
      if(stage.count != nrow(elig))
        stop("Mismatching row count, error reading in data")    
    
    # check that rows in stage are not less than the last time that it was created ----
      last_run <- as.POSIXct(odbc::dbGetQuery(db_claims, "SELECT MAX (last_run) FROM stage.mcare_elig_demo")[[1]])
    
      # count number of rows
      previous_rows <- as.numeric(
        odbc::dbGetQuery(db_claims, 
                         "SELECT c.qa_value from
                         (SELECT a.* FROM
                         (SELECT * FROM metadata.qa_mcare_values
                         WHERE table_name = 'stage.mcare_elig_demo' AND
                         qa_item = 'row_count') a
                         INNER JOIN
                         (SELECT MAX(qa_date) AS max_date 
                         FROM metadata.qa_mcare_values
                         WHERE table_name = 'stage.mcare_elig_demo' AND
                         qa_item = 'row_count') b
                         ON a.qa_date = b.max_date)c"))
      
      if(is.na(previous_rows)){previous_rows = 0}
      
      row_diff <- stage.count - previous_rows
      
      if (row_diff < 0) {
        odbc::dbGetQuery(
          conn = db_claims,
          glue::glue_sql("INSERT INTO metadata.qa_mcare
                         (last_run, table_name, qa_item, qa_result, qa_date, note) 
                         VALUES ({format(last_run, usetz = FALSE)}, 
                         'stage.mcare_elig_demo',
                         'Number new rows compared to most recent run', 
                         'FAIL', 
                         {format(Sys.time(), usetz = FALSE)}, 
                         'There were {row_diff} fewer rows in the most recent table 
                         ({stage.count} vs. {previous_rows})')",
                         .con = db_claims))
        
        problem.row_diff <- glue::glue("Fewer rows than found last time.  
                                       Check metadata.qa_mcare for details (last_run = {format(last_run, usetz = FALSE)})
                                       \n")
      } else {
        odbc::dbGetQuery(
          conn = db_claims,
          glue::glue_sql("INSERT INTO metadata.qa_mcare
                         (last_run, table_name, qa_item, qa_result, qa_date, note) 
                         VALUES ({format(last_run, usetz = FALSE)}, 
                         'stage.mcare_elig_demo',
                         'Number new rows compared to most recent run', 
                         'PASS', 
                         {format(Sys.time(), usetz = FALSE)}, 
                         'There were {row_diff} more rows in the most recent table 
                         ({stage.count} vs. {previous_rows})')",
                         .con = db_claims))
        
        problem.row_diff <- glue::glue(" ") # no problem, so empty error message
        
      }
    
    # check that there are no duplicates ----
      # get count of unique id (each id should only appear once)
      stage.count.unique <- as.numeric(odbc::dbGetQuery(
        db_claims, "SELECT COUNT (*) 
        FROM (Select id_mcare 
        FROM stage.mcare_elig_demo
        GROUP BY id_mcare
        )t;"
            ))
      
      if (stage.count.unique != stage.count) {
        odbc::dbGetQuery(
          conn = db_claims,
          glue::glue_sql("INSERT INTO metadata.qa_mcare
                         (last_run, table_name, qa_item, qa_result, qa_date, note) 
                         VALUES (
                         {format(last_run, usetz = FALSE)}, 
                         'stage.mcare_elig_demo',
                         'Number distinct IDs', 
                         'FAIL', 
                         {format(Sys.time(), usetz = FALSE)}, 
                         'There were {stage.count.unique} distinct IDs but {stage.count} rows overall (should be the same)'
                         )
                         ",
                         .con = db_claims))
        
        problem.ids  <- glue::glue("Number of distinct IDs doesn't match the number of rows. 
                                   Check metadata.qa_mcare for details (last_run = {format(last_run, usetz = FALSE)})
                                   \n")
      } else {
        odbc::dbGetQuery(
          conn = db_claims,
          glue::glue_sql("INSERT INTO metadata.qa_mcare
                         (last_run, table_name, qa_item, qa_result, qa_date, note) 
                         VALUES ({format(last_run, usetz = FALSE)}, 
                         'stage.mcare_elig_demo',
                         'Number distinct IDs', 
                         'PASS', 
                         {format(Sys.time(), usetz = FALSE)}, 
                         'The number of distinct IDs matched number of overall rows ({stage.count.unique})')",
                         .con = db_claims))
        
        problem.ids  <- glue::glue(" ") # no problem
      }
    
    # create summary of errors ---- 
      problems <- glue::glue(
        problem.ids, "\n",
        problem.row_diff)
    
    
    
    
## (12) Fill qa_mcare_values table ----
    qa.values <- glue::glue_sql("INSERT INTO metadata.qa_mcare_values
                                (table_name, qa_item, qa_value, qa_date, note) 
                                VALUES ('stage.mcare_elig_demo',
                                'row_count', 
                                {stage.count}, 
                                {format(Sys.time(), usetz = FALSE)}, 
                                '')",
                                .con = db_claims)
    
    odbc::dbGetQuery(conn = db_claims, qa.values)


## (13) Print error messages ----
    if(problems >1){
      message(glue::glue("WARNING ... MCARE_ELIG_DEMO FAILED AT LEAST ONE QA TEST", "\n",
                         "Summary of problems in MCARE_ELIG_DEMO: ", "\n", 
                         problems))
    }else{message("Staged MCARE_ELIG_DEMO passed all QA tests")}

## The end! ----
    run.time <- Sys.time() - start.time
    print(run.time)
    
PHSKC-APDE/Medicaid documentation built on April 24, 2024, 6:09 p.m.