R/sql_cooc.R

Defines functions subset_codes get_codes_dict get_cooc_batch init_indexing init_spm_cooc filter_min_freq sql_cooc

Documented in sql_cooc

#' Compute co-occurrence matrix on SQL file
#'
#' Performs out-of-memory co-occurrence for large databases that would not fit
#' in RAM memory with the classic call to build_df_cooc.
#' Patients are batched using the n_batch parameter.
#' Co-occurrence sparse matrix output is written to a new SQL file.
#' Depending on number of codes considered, need to adjust n_batch and n_cores.
#' See vignette "Co-occurrence and PMI-SVD" for more details. 
#'
#' @param input_path Input SQL file path.
#'                   Must contain monthly counts table 'df_monthly',
#'                   with columns 'Patient', 'Month', 'Parent_Code', 'Count'.
#'                   Also requires an index on column 'Patient' and a table of
#'                   the unique codes 'df_uniq_codes', but will perform it
#'                   automatically if parameter autoindex is TRUE
#'                   (can increase input file size by 40\%).
#' @param output_path Output SQL file path for co-occurrence sparse matrix.
#'                    Can overwrite with overwrite_output parameter.
#' @param min_code_freq Filter output matrix based on code frequency.
#' @param exclude_code_pattern Pattern of codes prefixes to exclude.
#'                             Will be used in SQL appended by '%' and in grep
#'                             prefixed by '^'.
#'                             For example, 'AB'.
#' @param exclude_dict_pattern Used in combination with codes_dict.
#'                             Pattern of codes prefixes to exclude, except if
#'                             they are found in codes_dict.
#'                             Will be used in SQL appended by '%' and/or in
#'                             grep prefixed by '^'.
#'                             For example, 'C[0-9]'.
#' @param codes_dict_fpaths Used in combination with exclude_dict_pattern.
#'                          Filepaths to define codes to avoid excluding using
#'                          exclude_dict_pattern.
#'                          First column of each file must define the code
#'                          identifiers.
#' @param n_batch Number of patients per batch.
#' @param n_cores Number of cores.
#' @param autoindex If table 'df_uniq_codes' not found in input_path,
#'                  index table 'df_monthly' on column 'Patient', and write
#'                  unique values of 'Parent_Code' to table 'df_uniq_codes'.
#' @param overwrite_output Should output_path be overwritten ?
#' @param verbose Prints batch progress.
#' @param verbose_max Prints memory usage at each batch.
#' @param ... Passed to build_df_cooc
#'
#' @return None, side-effect is output SQL file creation.
#' @examples
#'
#' df_ehr = data.frame(Patient = c(1, 1, 2, 1, 2, 1, 1, 3, 4),
#'                     Month = c(1, 1, 1, 2, 2, 3, 3, 4, 4),
#'                     Parent_Code = c('C1', 'C2', 'C2', 'C1', 'C1', 'C1',
#'                                     'C2', 'C3', 'C4'),
#'                     Count = 1:9)
#' 
#' library(RSQLite)
#'
#' test_db_path = tempfile()
#' test_db = dbConnect(SQLite(), test_db_path)
#' dbWriteTable(test_db, 'df_monthly', df_ehr, overwrite = TRUE)
#'
#' dbDisconnect(test_db)
#'
#' output_db_path = tempfile()
#' sql_cooc(test_db_path, output_db_path, autoindex = TRUE)
#'
#' test_db = dbConnect(SQLite(), output_db_path)
#' spm_cooc = dbGetQuery(test_db, 'select * from df_monthly;')
#' dbDisconnect(test_db)
#'
#' spm_cooc
#'
#' @export
sql_cooc = function(input_path, output_path, min_code_freq = 5,
                    exclude_code_pattern = NULL, exclude_dict_pattern = NULL,
                    codes_dict_fpaths = NULL, n_batch = 300, n_cores = 1,
                    autoindex = FALSE, overwrite_output = FALSE,
                    verbose = TRUE, verbose_max = verbose, ...) {

  stopifnot(!missing(input_path) && !missing(output_path))
  if (file.exists(output_path)) {
    if (!overwrite_output) stop('Output exists and overwrite_output is FALSE')
    file.remove(output_path)
  }
  if (length(exclude_code_pattern) > 1) {
    warning('Multiple exclude_code_pattern not tested')
  }
  if (length(exclude_dict_pattern) > 1) {
    warning('Multiple exclude_dict_pattern not tested')
  }

  codes_dict = if (!is.null(codes_dict_fpaths)) {
    get_codes_dict(codes_dict_fpaths, n_cores)
  }

  # seems we need two different connections for dbFetch to work correctly
  mydb_ids = dbConnect(SQLite(), input_path)
  init_indexing(mydb_ids, autoindex, verbose)

  mydb_batch = dbConnect(SQLite(), input_path)
  outdb = dbConnect(SQLite(), output_path)

  # init the sparse matrix with all the unique codes
  spm_cooc = init_spm_cooc(mydb_ids, exclude_code_pattern,
                           exclude_dict_pattern, codes_dict)
  uniq_codes = rownames(spm_cooc)

  ids_query = dbSendQuery(mydb_ids,
                          'SELECT DISTINCT Patient FROM df_monthly')

  while (!dbHasCompleted(ids_query)) {

    cooc_batch = get_cooc_batch(ids_query, mydb_batch, exclude_code_pattern,
                                exclude_dict_pattern, codes_dict,
                                uniq_codes, n_batch, n_cores,
                                verbose = verbose_max, ...)

    if (!is.null(cooc_batch)) spm_cooc = spm_cooc + cooc_batch
    rm(cooc_batch)

    if (verbose) {
      message('Batch completed')

      if (verbose_max) {
        message(paste('Full sparse matrix size:', object.size(spm_cooc)))
      }
    }
  }

  dbClearResult(ids_query)

  spm_cooc = filter_min_freq(spm_cooc, min_code_freq)
  spm_cooc = spm_to_df(spm_cooc)                                   
  dbWriteTable(outdb, 'df_monthly', spm_cooc, append = TRUE)

  dbDisconnect(mydb_batch)
  dbDisconnect(mydb_ids)
  dbDisconnect(outdb)
}

filter_min_freq = function(spm_cooc, min_code_freq) {

  if (min_code_freq > 0) {

    rm_idxs = which(Matrix::diag(spm_cooc) < min_code_freq)

    if (length(rm_idxs) > 0) spm_cooc = spm_cooc[-rm_idxs, -rm_idxs]
  }

  spm_cooc
}

# init the sparse matrix with all the unique codes
init_spm_cooc = function(db_con, ...) {

  uniq_codes = dbGetQuery(db_con, 'select Parent_Code from df_uniq_codes;')

  uniq_codes = sort(subset_codes(uniq_codes, ...)$Parent_Code)

  spm_cooc = reshape2::melt(matrix())[-1, ]
  spm_cooc = build_spm_cooc(spm_cooc, uniq_codes)

  rownames(spm_cooc) = uniq_codes

  spm_cooc
}

init_indexing = function(inputdb, autoindex, verbose) {

  if (!'df_uniq_codes' %in% dbListTables(inputdb)) {

    if (!autoindex) {
      stop('Table df_uniq_codes does not exist and autoindex is FALSE')
    }
 
    if (verbose) message('Writing SQL index on patients')
    dbExecute(inputdb, "CREATE INDEX patient_idx ON df_monthly (Patient)")

    if (verbose) message('Writing table of unique codes df_uniq_codes')
    df_uniq_codes = dbGetQuery(inputdb,
                               'select distinct Parent_Code from df_monthly;')

    dbWriteTable(inputdb, 'df_uniq_codes', df_uniq_codes, overwrite = TRUE)
  }
}

get_cooc_batch = function(ids_query, mydb_batch, exclude_code_pattern,
                          exclude_dict_pattern, codes_dict, uniq_codes,
                          n_batch, n_cores, verbose, ...) {

  pat_ids_batch = dbFetch(ids_query, n_batch)$Patient

  # using "in" is probably not so great when we have n_batch > 1e4..
  db_query = paste0('SELECT * FROM df_monthly WHERE Patient in (',

                    paste(pat_ids_batch, collapse = ', '), ")",

                    if (!is.null(exclude_code_pattern)) {

                      paste0(" and Parent_Code not like '",
                             exclude_code_pattern, "%'")
                    })

  if (is.null(codes_dict) && !is.null(exclude_dict_pattern)) {
    db_query = paste0(db_query, " and Parent_Code not like '",
                      exclude_dict_pattern, "%'")
  }

  spm_cooc_batch = dbGetQuery(mydb_batch, db_query)

  if (verbose) message(paste('Batch size:', object.size(spm_cooc_batch)))

  spm_cooc_batch = subset_codes(spm_cooc_batch, exclude_code_pattern,
                                exclude_dict_pattern, codes_dict)

  spm_cooc_batch = na.omit(spm_cooc_batch)

  if (nrow(spm_cooc_batch) > 0) {
    build_df_cooc(spm_cooc_batch, uniq_codes, n_cores, min_code_freq = 0, ...)
  }
}

get_codes_dict = function(codes_dict_fpaths, n_cores) {

  l_codes_dict = lapply(codes_dict_fpaths, function(fpath) {
      data.table::fread(fpath, data.table = FALSE, nThread = n_cores)[[1]]
    })

  unique(unlist(l_codes_dict))
}

subset_codes = function(df_ehr, exclude_code_pattern, exclude_dict_pattern,
                        codes_dict) {

  codes_pattern = c(exclude_code_pattern, exclude_dict_pattern)

  if (is.null(codes_dict) && is.null(codes_pattern)) return(df_ehr)

  if (is.null(codes_pattern)) {

    subset(df_ehr, Parent_Code %in% codes_dict)

  } else {

    codes_pattern %<>% paste0('^', ., collapse = '|')

    if (is.null(codes_dict)) {

      subset(df_ehr, !grepl(codes_pattern, Parent_Code))

    } else {

      subset(df_ehr, !grepl(codes_pattern, Parent_Code) |
                      Parent_Code %in% codes_dict)
    }
  }
}

Try the nlpembeds package in your browser

Any scripts or data that you put into this service are public.

nlpembeds documentation built on April 4, 2025, 4:41 a.m.