R/process_r04.R

validate_data_r04 <- function(data){

  names(data) <- toupper(names(data))

  stopifnot("RUT" %in% names(data))
  stopifnot("PERIODO" %in% names(data))
  stopifnot(ncol(data) == 2)

  data <- data %>%
    distinct(RUT, PERIODO) %>%
    mutate_all(as.numeric) %>%
    arrange(PERIODO, RUT) %>%
    mutate(ID = row_number()) %>%
    select(everything(), ID)

  data

}

get_r04_aux <- function(drut, periodos = c(1, 6 , 12), r04_con_cmr = FALSE) {

  max_p_2 <- max(periodos) + 2
  # x fecha observacion
  # y fecha sbif
  # > --- DIFF positivo --- > (periodo observacion) ----- DIFF negativo --->

  tblr04 <- ifelse(r04_con_cmr, "r04_bco_cmr", "r04")

  message("Usando: ", tblr04)

  dr04 <- tbl(drut$src$con, tblr04) %>%
    add_year_month() %>%
    left_join(add_year_month(drut), ., by = "RUT") %>%
    mutate(DIFF = 12 * (YEAR.x - YEAR.y) + (MONTH.x - MONTH.y)) %>%
    select(ID, RUT, ends_with("x"), ends_with("y"), DIFF, everything()) %>%
    filter(DIFF > 0, DIFF <= max_p_2) %>%
    mutate(
      SALDO = D_DIRECTA + D_MOROSA + D_VENCIDA + D_CASTIGADA,
      D_MVC = D_MOROSA + D_VENCIDA + D_CASTIGADA,
      SALDO_MAS_LINEA = D_DIRECTA + D_MOROSA + D_VENCIDA + D_CASTIGADA + LINEA_DISP
    )

  name <- "r04_aux"

  remove_if_exists(drut$src$con, name)

  qs <- db_sql_render(dr04$src$con, dr04) %>%
    as.character() %>%
    str_replace("FROM", sprintf("INTO %s FROM", name))

  message("computing ", name)
  dbSendQuery(dr04$src$con, qs)

  dr04 <- tbl(dr04$src$con, name)

  dr04

}

get_r04_join_dlm <- function(dr04, periodos = c(1, 6, 12), desfase) {

  message("Usando ", desfase, " meses de desfase")

  get_r04_summarize_by_per <- function(dr04, p){ # p <- 6

    message("PERIODO ", p)

    daux <- dr04 %>%
      select(-contains(".x"), -contains(".y"), -RUT) %>%
      filter(between(DIFF, desfase, p + desfase - 1)) # %>% # count(DIFF) %>% collect() %>% View()

    d1 <- daux %>%
      select(-DIFF) %>%
      group_by(ID) %>%
      summarise_all(.funs = funs(MEAN = mean, MAX = max, MIN = min, SUM = sum))

    d2 <- daux %>%
      group_by(ID) %>%
      summarise(
        N = n(),
        RATIO_DDA_MOR_MAX_DDA_VIG_MEAN = ifelse(sum(D_DIRECTA) > 0, max(as.double(D_MOROSA))/mean(as.double(D_DIRECTA)), 0),
        # perc_dda_con_cup o RDD
        PORC_DDA_LINEA  = ifelse(max(DCCONSUMO) + max(LINEA_DISP) > 0, sum(as.double(DCCONSUMO)) / (as.double(sum(DCCONSUMO) + sum(LINEA_DISP)))  , 0)
      )

    d3 <- daux %>% filter(D_MOROSA > 0) %>% group_by(ID) %>% summarise(D_MOR_REC = min(DIFF) - desfase)
    d4 <- daux %>% filter(D_MOROSA + D_VENCIDA > 0) %>% group_by(ID) %>% summarise(D_MORVEN_REC = min(DIFF) - desfase)
    d5 <- daux %>% filter(D_MVC > 0) %>% group_by(ID) %>% summarise(D_MORVENCAS_REC = min(DIFF) - desfase)

    d6 <- daux %>% filter(DCCONSUMO > 0) %>% group_by(ID) %>% summarise(D_CON_ANT = max(DIFF) - desfase + 1)
    d7 <- daux %>% filter(DHIPOTECAR> 0) %>% group_by(ID) %>% summarise(D_HIP_ANT = max(DIFF) - desfase + 1)
    d8 <- daux %>% filter(D_DIRECTA > 0) %>% group_by(ID) %>% summarise(MESES_BANC = n())

    daux <- list(d1, d2, d3, d4, d5, d6, d7, d8) %>% reduce(left_join, by = "ID")

    vars <- daux$ops$args$vars$alias
    varsnew <- paste(vars, str_pad(p, 2, pad = "0"), sep = "_")
    varsnew[1] <- vars[1]
    varsnew <- as.list(set_names(vars, varsnew))

    daux <- daux %>% select_(.dots = varsnew)

    daux


  }

  data_join <- map(periodos, get_r04_summarize_by_per, dr04 = dr04) %>%
    reduce(full_join, by = "ID")

  name <- "r04_join_dlm"
  remove_if_exists(dr04$src$con, name)

  qs <- db_sql_render(data_join$src$con, data_join) %>%
    as.character() %>%
    str_replace("FROM", sprintf("INTO %s FROM", name))

  message("computing ", name)
  dbSendQuery(dr04$src$con, qs)

  data_join <- tbl(dr04$src$con, name)

  data_join

}

get_r04_tend_by_pers <- function(ddlm, periodos = c(1, 6, 12), tblname) {

  vars <- ddlm$ops$vars  %>%
    str_subset("MEAN") %>%
    str_replace_all("_[0-9]+", "") %>%
    unique()

  miniqs <- expand.grid(v = vars, p1 = periodos, p2 = periodos) %>%
    tbl_df() %>%
    filter(p1 < p2) %>%
    mutate_if(is.factor, as.character) %>%
    mutate_at(vars(p1, p2), str_pad, width = 2, pad = 0) %>%
    mutate(
      a = pmap_chr(., function(v, p1, p2) str_glue("case when {var}_{p2} > 0 then CAST({var}_{p1} AS float)/CAST({var}_{p2} AS float) else NULL end", var = v, p1 = p1, p2 = p2)),
      b = pmap_chr(., function(v, p1, p2) str_glue("{var}_TEND_{p1}_{p2}", var = v, p1 = p1, p2 = p2)),
      miniq = map2_chr(a, b, function(a, b) str_glue("{a} as {b}", a = a, b = b))
    ) %>%
    pull(miniq) %>%
    str_c(collapse = ", ")

  miniqs2 <- expand.grid(v = setdiff(vars, str_subset(vars, "MAX")), p = periodos) %>%
    tbl_df() %>%
    mutate_if(is.factor, as.character) %>%
    mutate(p = str_pad(p, width = 2, pad = 0), v = str_replace_all(v, "_MEAN", "")) %>%
    mutate(
      a = map2_chr(v, p, function(v, p) str_glue("case when {var}_MAX_{p} > 0 then CAST({var}_MIN_{p} as float)/CAST({var}_MAX_{p} AS float) else NULL end", var = v, p  = p)),
      b = map2_chr(v, p, function(v, p) str_glue("{var}_RATIO_MIN_MAX_{p}", var = v, p  = p)),
      miniq = map2_chr(a, b, function(a, b) str_glue("{a} as {b}", a = a, b = b))
    )%>%
    pull(miniq) %>%
    str_c(collapse = ", ")

  name <- str_c(tblname, "_variables")
  remove_if_exists(ddlm$src$con, name)

  qs <- str_glue(
    "SELECT B.PERIODO, B.RUT, A.*, { fields } FROM {tbl} A LEFT JOIN {tblori} B ON A.ID = B.ID",
    fields = miniqs,
    # fields2 = miniqs2,
    tbl = as.character(ddlm$ops$x),
    tblori = tblname
  ) %>%
    as.character() %>%
    str_replace("FROM", sprintf("INTO %s FROM", name))

  qs

  message("computing ", name)
  dbSendQuery(ddlm$src$con, qs)

  dvar <- tbl(ddlm$src$con, name)

  dvar

}

#' Function que calcula variables dado una tabla de rut periodo
#'
#' Function que calcula variables dado una tabla de rut periodo y una conexion
#' @param con una conexion odbc
#' @param data data frame con rut y periodo
#' @param periodos vector numerico con periodos a calcular tendencias, resumentes, etc
#' @param collect collect
#' @param r04_con_cmr Si considera la r04 con cmr o la original
#' @param desfase El desfase de meses para el calculo devariables
#'
#' @importFrom dplyr mutate mutate_if mutate_at summarise row_number everything collect
#'   filter group_by summarise_all between contains tbl left_join ends_with starts_with
#'   distinct select select_ arrange pull funs tbl_df n mutate_all n_distinct data_frame
#' @importFrom purrr map2 map2_chr map set_names reduce pmap_chr pmap
#' @importFrom stringr str_c str_pad str_replace str_replace_all str_subset str_glue str_length
#' @importFrom dbplyr db_sql_render
#' @export
get_r04_vars <- function(con, data, periodos = c(1, 3, 6, 12), collect = TRUE, r04_con_cmr = FALSE, desfase = 2) {

  t0 <- Sys.time()

  message("Starting at: ", as.character(t0))

  dbSendQuery(con, "USE MATRIX")
  remove_if_exists(con, "r04_aux")
  remove_if_exists(con, "r04_join_dlm")
  # dbSendQuery(con, "DBCC SHRINKFILE(MATRIX_log, 1)")

  name <- create_name(data, prefix = ifelse(r04_con_cmr, "r04_con_cmr", "r04"))

  data <- validate_data_r04(data)
  drut <- upload_data(con, data, tblname = name)
  dr04 <- get_r04_aux(drut, periodos, r04_con_cmr = r04_con_cmr)
  ddlm <- get_r04_join_dlm(dr04, periodos, desfase = desfase)
  dtnd <- get_r04_tend_by_pers(ddlm, periodos, tblname = name)

  if(collect) {
    message("Collecting ", name)
    dtnd <- collect(dtnd)
  }

  message("Ending at: ", as.character(Sys.time()))
  print(difftime(Sys.time(), t0))

  dtnd

}
jbkunst/modflblla documentation built on June 21, 2019, 12:53 p.m.