R/survey_load_data.R

Defines functions survey_load_participations survey_load_results_historic survey_load_participants survey_participant_previous_season.single_table survey_participant_previous_season.multiple_table relative_seasons survey_participant_previous_season survey_participant_season join_surveyuser survey_variable_available check_season_dates survey_load_results

Documented in check_season_dates join_surveyuser relative_seasons survey_load_participants survey_load_participations survey_load_results survey_load_results_historic survey_participant_previous_season survey_participant_previous_season.multiple_table survey_participant_previous_season.single_table survey_participant_season survey_variable_available

##
# Load Results & Participation data

#' load a survey results set
#'
#' survey_load_results loads results set for a given survey. It handles several features:
#'  \itemize{
#'    \item mapping of column names (in the DB) to meaningful names (defined in epiwork.tables)
#'    \item merging with geographic tables (geo parameters set geographic level to use)
#'    \item selecting data from a season (automatic table and/or dates bound selection)
#' }
#'
#' Note : "global_id" is not used here we use person_id (sequential index of the user = primary key of survey_surveyuser table) instead because it is really cheaper for memory
#'
#' @param survey : survey name (as defined in epiwork.tables)
#' @param cols column list (you can use aliases)
#' @param geo code for the geo levels to add to the results set (only if the survey contains a geographic column)
#' @param date list(min=,max=) of min and max date to restrict the loaded data (use of the timestamp of the survey)
#' @param db.table override table name (to hanlde backup tables or test table)
#' @param survey.users list of id of SurveyUsers, load data only for these ids
#' @param account boolean if TRUE include "account" id (account user id). Only works if "user" column is defined
#' @param where where condition to apply on the query (caution, no check)
#' @param season if not NULL, season number, will load data for a given season, by default load all data from "pollster_results_[survey]"
#' @param channel if not null get data only for a given channel value
#' @param cols.sup supplementary column or SQL select clauses (caution no check)
#' @param gid if TRUE load gid column
#' @param debug if TRUE show query
#' @param country country name (if survey data has country column)
#' @return data.frame data of the survey
#' @family survey-load
#' @export
survey_load_results = function(survey, cols, geo=NULL, date=NULL, db.table=NULL, survey.users=NULL, debug=F, account=F, where=c(), season=NULL, channel=NULL, cols.sup=c(), gid=F, country=NULL) {
  def = survey_definition(survey)
  if( !is.null(season) ) {
    season = parse_season(season, accept.several=FALSE)
    h = season_definition(season)
    if( is.null(db.table) ) {
      db.table = h[[survey]]
    }
    if( isTRUE(def$single.table) ) {
      date = check_season_dates(h, date)
    }
    check_season = season
  } else {
    check_season = tail(get_historical_seasons(), n=1L)
  }
  msg = c()
  if( is.null(db.table) ) {
    msg = c(msg, "using default table")
    tb = def$table
  } else {
    msg = c(msg, paste("using table", db.table))
    tb = db.table
  }
  if( length(cols) == 1 && (cols == "*") ) {
    cc = "p.*"
  } else {
    cols = survey_variable_available(cols, def, check_season, country=country)
    cols = survey_aliases(cols, def)
    cc = paste0('p."', cols, '"')
    if(!isTRUE(def$create_id)) {
      cc = c('p."id" as "id"', cc)
    }
  }
  if(account) {
    cc = c('s."user_id" as "account_id"', cc)
  }
  if(gid) {
    cc = c(cc, 'p."global_id"')
  }
  if( !is.null(geo) ) {
    if( is.null(def$geo.column) ) {
      rlang::abort("geographic column not defined for this column")
    }
    geo.column = def$geo.column
    if( is.logical(geo) ) {
      # get default hierarchy list
      geo = geo_hierarchy()
    }
    gg = sapply(geo, geo_column)
    cc = c(cc, gg)
    if(debug) {
      cat("Joining geo table g, including columns", paste(gg, collapse=', '),"\n")
    }
    join = get_geo_join()
    j = paste0(' left join ', join$table,' g on g."', join$base.column,'"=p."', geo.column, '"')
    if( isTRUE( join$join.country ) ) {
      j = paste(j,' and g."country"=p."country"')
    }
  } else {
    j = ''
  }
  if( !is.null(survey.users) ) {
    where = c(where, paste0('s.id IN(', paste(survey.users, collapse=','), ')'))
  }
  if( !is.null(date) ) {
    msg = c(msg, paste("from", sQuote(format(format="%Y-%m-%d %H:%M:%S", date$min)), "to", sQuote(format(format="%Y-%m-%d %H:%M:%S", date$max))) )
    if( !is.null(date$min) ) {
      where = c(where, paste0(db_quote_var("timestamp")," >= ", db_quote_str(date$min)))
    }
    if( !is.null(date$max) ) {
      m = paste0(as.Date(date$max) + 1,' 00:00:00') # last day + 1
      where = c(where, paste0(db_quote_var("timestamp")," < ", db_quote_str(m)))
    }
  }

  #
  # Disable channel if explicitly set to NA

  if( !is.null(channel) ) {
    if( !is.na(channel) ) {
      where = c(where, paste0(db_quote_var("channel"),'=', db_quote_str(channel)))
      msg = c(msg, paste("channel=",channel))
    }
  } else {
    use_channel = get0("platform.use.channel", ifnotfound = FALSE, envir = .Share)
    if(use_channel) {
      msg = c(msg, "only empty channel")
      where = c(where, paste0(db_quote_var("channel"), "=''"))
    }
  }

  if( !is.null(country) ) {
    if( is.null(def$aliases$country) ) {
      rlang::abort("Country column not set for this table, unable to use 'country' parameter")
    }
    if(length(country) > 0) {
      country = toupper(country) # fix compat with incidence
      if(any(!country %in% .Share$COUNTRY_CODES)) {
        stop(paste("Unknown country codes :", paste(country[!country %in% .Share$COUNTRY_CODES], collapse = ",")))
      }
      country = db_quote_str(country)
      where = c(where, paste("p.", db_quote_var("country")," IN(", paste(country, collapse = ','),")"))
    }
  }

  if(length(msg) > 0) {
    msg = paste("Loading",survey, paste(msg, collapse = ", "))
    cat(msg,"\n")
  }

  cc = c(cc, cols.sup)
  cc = paste(cc, collapse=',')
  if(length(where) > 0) {
    where = paste0(' WHERE ', paste(where, collapse=' AND '))
  } else {
    where = ''
  }
  query = paste0('SELECT s.id as person_id, ',cc,' from ',tb,' p left join ', join_surveyuser('p', 's'), j, where)
  if(debug) {
    cat(query, "\n")
  }
  r = dbQuery(query)
  if( is.data.frame(r) ) {
    n = names(r)
    n = survey_aliases(n, def, revert=TRUE)
    names(r) <- n
  }

  if(isTRUE(def$create_id) && nrow(r) > 0) {
    r$id = seq_len(nrow(r))
    attr(r, 'virtual_id') <- TRUE
  }

  if(!is.null(geo)) {
    r = geo_normalize(r, columns=gg)
  }

  attr(r,'survey') <- survey
  attr(r, 'db.table') <- db.table
  attr(r, 'season') <- ifelse(!is.null(season), season, NA) # If current season (or not requested) attribute set to NA
  r
}

#' Check if provided date bounds are compatible with a season
#'
#' @param def season definition returned by \code{\link{season_definition}()}
#' @param date list() list of min, max requested dates
check_season_dates = function(def, date) {
  # All seasons data are in one single table
  # Use "dates" of the season to load data, or use limits from "date" parameter if defined (and requested dates included in the season's ones)
  # end date could be null (during the current season) & is replaced by the current data & time
  d = lapply(def$dates, function(x) { if(!is.null(x)) { as.POSIXct(x) } else { Sys.time() } })
  if( is.null(date) ) {
    date = list(min=d$start, max=d$end)
  } else {
    # Check that the requested data are included
    date = lapply(date, as.POSIXct)
    if(!is.null(date$min)) {
      if( !( date$min >= d$start && date$min <= d$end )) {
        rlang::abort(paste("Requested minimum date", date$min," is not included in the season period (", d$start,"-", d$end,")"))
      }
    } else {
      date$min = d$start
    }
    if( !is.null(date$max) ) {
      if( !( date$max >= d$start && date$max <= d$end )) {
        rlang::abort(paste("Requested minimum date", date$min," is not included in the season period (", d$start,"-", d$end,")"))
      }
    } else {
      date$max = d$end
    }
  }
  date
}


#' Check if variables are available for a given season
#'
#' This function is conservative: only variable registered with an availability clause are checked
#'
#'
#' @param variables character vector of name of variables (not db name)
#' @param survey survey name or survey_definition from \code{\link{survey_definition}()}
#' @param season season to check of variable availability
#' @param country country to check of variable availability (if platform is configured to accept it)
#' @return variables if they are available
#' @export
survey_variable_available <- function(variables, survey, season, country=NULL) {
  if(!is(survey, "survey_definition")) {
    survey = survey_definition(survey)
  }
  mapping = survey$aliases
  available = sapply(variables, function(var) {
      dv = mapping[[var]]
      if(is.null(dv)) {
        return(TRUE) # Some variable are not in the survey description, don't block it
      }
      av = attr(dv, "available")
      if(is.null(av)) {
        return(TRUE)
      }
      if(is.vector(av)) {
        # List of season values
        season %in% av
      } else {
        r = rlang::eval_tidy(av, data=list(season=season, country=country))
        if(!is.logical(r) && length(r) == 1) {
          rlang::abort("Error in survey definition for variable ", sQuote(var)," available condition must return logical of length 1")
        }
        r
      }
  })
  if(any(!available)) {
    v = variables[!available]
    rlang::warn(paste("Some variables are not available for season", season,":", paste(sQuote(v), collapse = ',')), variables=v, class="variable_not_available")
  }
  variables[available]
}

#' Create join clause to survey_surveyuser table for a survey
#' @param survey_alias table alias name of the survey table
#' @param user_alias table to use for the survey_user table
join_surveyuser = function(survey_alias, user_alias) {
  paste0(' survey_surveyuser ',user_alias,' on ',survey_alias,'.global_id=',user_alias,'.global_id ')
}


#' Get list of participant id (person_id = survey_user_id)
#' List of participants registered in weekly at least once for a given season
#' @param season season number to get, if several seasons are given use min and max of seasons dates
#' @param use.season.dates restrict to season's starting & ending dates. forced if single table model for weekly
#' @param use.min us minimal date of the given season, if FALSE use all before the end of the season (only for single table model)
#' @param country country to restrict participant
#' @family survey-load
#' @export
survey_participant_season = function(season, use.season.dates=FALSE, use.min=TRUE, country=NULL) {
  # For use of dates if single table model
  if(isTRUE(.Share$epiwork.tables$weekly$single.table)) {
    use.season.dates = TRUE
  } else {
    if(use.min) {
      rlang::warn("Use of `use.min` with multiple table cannot work ")
    }
  }
  h = season_definition(season)
  min = NULL
  max = NULL
  if(use.season.dates) {
    dates = get_season_dates(season)
    if(use.min) {
      min = dates$start
    }
    max = dates$end
  }
  w = c()
  time.col = db_quote_var("timestamp")
  if(!is.null(min)) {
    w = c(w, paste0(time.col, " >=", db_quote_str(min) ))
  }
  if(!is.null(max)) {
    w = c(w, paste0(time.col, " <=", db_quote_str(max) ))
  }
  if(can_use_country(country)) {
    w = c(w, paste0("p.", db_quote_var("country"),"=",db_quote_str(country)))
  }

  if(length(w) > 0) {
    w = paste(' where', paste(w, collapse = ' and ') )
  } else {
    w = ''
  }
  #print(w)
  query = paste0("SELECT distinct s.id as person_id from ", h$weekly," p left join ", join_surveyuser("p","s"), w)
  p = dbQuery(query)
  p$person_id
}

#' List of participants registered in weekly at least once in previous season (regarding given [season])
#' @param season season year
#' @param ids list of participants to keep
#' @param use.season.dates if TRUE restrict weekly scan to the official date of each season \code{\link{get_historical_tables}}
#' @param country country to use (only on european platform)
#' @param from integer relative index of the oldest season to scan to (index relative to season, e.g. -1 = previous from given [season])
#' @param verbose show verbose messages
#' @family survey-load
#' @export
survey_participant_previous_season = function(season, ids=NULL, use.season.dates=F, from=NULL, country=NULL, verbose=FALSE) {
  season = parse_season(season)
  if(!is.null(from)) {
    if(!is.integer(from)) {
      rlang::abort("from must be integer vector")
    }
    if(any(from >= 0)) {
      rlang::abort("from must be a vector of negative integers")
    }
  }
 if(isTRUE(.Share$epiwork.tables$weekly$single.table)) {
    survey_participant_previous_season.single_table(season=season, ids=ids, from=from, country=country, verbose=verbose)
  } else {
    survey_participant_previous_season.multiple_table(season=season, ids=ids, use.season.dates=use.season.dates, from=from, country=country, verbose=verbose)
  }
}

#' Get relative season numbers to a reference season number
#'
#' @param season int reference season
#' @param index int vector of relative index to the reference season, if NA or NULL get all previous
#' @param .all int[] only select on this seasons, if not NULL
#' @keywords internal
#' @return vector of season numbers matching the index relative to season reference
#' @export
relative_seasons = function(season, index=NULL, .all=NULL) {
  all.seasons = as.integer(get_historical_seasons())
  if(!is.null(.all)) {
    all.seasons = all.seasons[all.seasons %in% .all]
  }
  if( is.null(index) || is.na(index) ) {
    seasons = all.seasons
  } else {
    index = as.integer(index)
    min = -length(all.seasons)
    if(any(is.na(index) | index < min | index >= 0)) {
      rlang::abort(paste0("`from` should be a negative integer value (min ",min,"), given ", index))
    }
    if(any(index > 0)) {
      rlang::abort("from should be a negative")
    }
    seasons = season + index # seasons are relative index to [season]
  }
  seasons = seasons[seasons < season] # exclude given
  seasons = seasons[seasons %in% all.seasons] # keep only valid seasons values
  seasons
}

#' Implementation for multiple table data model
#' we have to check season by season
#' @rdname survey_participant_previous_season
survey_participant_previous_season.multiple_table = function(season, ids=NULL, use.season.dates=F, from=NULL, country=NULL, verbose=TRUE) {
  message(paste0("survey_participant_previous_season: multiple table for ", season, "use.date=", use.season.dates, " from:",from, " country:", country))
  seasons = relative_seasons(season, index=from)
  previous = c()
  for(s in seasons) {
    p = survey_participant_season(s, use.season.dates = use.season.dates, use.min = FALSE)
    previous = unique(c(previous, p))
  }
  if( !is.null(ids) ) {
    previous = previous[ previous %in% ids]
  }
  previous
}

#' Implementation for multiple table data model
#' we have to check season by season
#' @rdname survey_participant_previous_season
survey_participant_previous_season.single_table = function(season, ids=NULL, from=NULL, country=NULL, verbose=TRUE) {
  message(paste0("survey_participant_previous_season: single table for ",season," from=", from," in country ", country))
  if(is.null(from)) {
    prev.season = relative_seasons(season, index=-1L) # get just previous season because we use it as max date
    if(verbose) {
      message("previous season =", prev.season)
    }
    if(length(prev.season) == 0L) {
      if(verbose) {
        message("no previous season =", season)
      }
      return(ids)
    }
    prev.season = min(prev.season)
    # Only use the season as a maximum
    previous = survey_participant_season(prev.season, use.min = FALSE, use.season.dates = TRUE, country=country)
  } else {
    seasons = min(relative_seasons(season, index=from))
    previous = survey_participant_season(seasons[1L], use.min=TRUE, use.season.dates = TRUE, country=country)
  }
  if( !is.null(ids) ) {
    previous = previous[ previous %in% ids]
  }
  previous
}


#' Load participants data
#' @param active.account logical only active user account if TRUE
#' @param ids list of survey_user ids
#' @family survey-load
#' @export
survey_load_participants = function(active.account=NULL, ids=NULL) {

  where = c()
  join = c()
  select = c()
  if( isTRUE(active.account) ) {

    join = c(join, 'left join auth_user a on a.id=s.user_id')
    where = c(where, 'a.is_active=True')
    select = c(select, 'a.last_login' ,'a.date_joined')
  }

  if( !is.null(ids) ) {
    where = c(where, paste0('s.id in(', paste(ids, collapse = ','),')'))
  }

  where = if(length(where)> 0) paste0(' WHERE ', paste(where, collapse = ' AND ')) else ''
  join = if(length(join) > 0) paste(join, collapse = ' ')
  select = if(length(select) >0) paste0(',', paste(select, collapse = ',')) else ''

  r = dbQuery('select s.id as person_id, s.global_id, s.user_id, deleted',select,' from survey_surveyuser s ', join, where)

  class(r) <- c('gn_participants', class(r))

  r
}

#' Load historical data for a set of users
#' @param ids list of participants (survey_user.id or results from survey_load_participants)
#' @param survey survey shortname
#' @param cols list of columns to load
#' @export
#' @family survey-load
#' @importFrom methods is
survey_load_results_historic = function(ids, survey, cols) {
  if(!requireNamespace("dplyr")) {
    stop("dplyr required to use this function")
  }
  years = sort(names(.Share$historical.tables), decreasing = TRUE)

  if( is(ids, 'gn_participants') ) {
    ids = ids$person_id
  }

  intakes = NULL

  for(year in years) {

    cat(year, "\n")

    if( !is.null(intakes) ) {
      ids = ids[ !ids %in% intakes$person_id ]
    }

    if(length(ids) == 0) {
      break()
    }

    ii = survey_load_results(survey, cols=cols, survey.users = ids, season = year)
    ii = keep_last_survey(ii)

    if(survey == "intake") {
      ii$code_com = as.character(ii$code_com)
      ii$occup.place.com = as.character(ii$occup.place.com)
      ii$vacc.date = as.Date(as.character(ii$vacc.date))
    }
    cat(nrow(ii),"\n")
    ii$season = year
    intakes = dplyr::bind_rows(intakes, ii)
  }
  intakes
}


#' Load last participation date for intake & weekly survey for each participants
#' @param ids list of participants
#' @param years list of season to scan, all if NULL
#' @family survey-load
#' @export
survey_load_participations = function(ids, years=NULL) {

  if(!requireNamespace("dplyr")) {
    stop("dplyr required to use this function")
  }

  if(is.null(years)) {
    years = sort(names(.Share$historical.tables), decreasing = TRUE)
  }

  if( is(ids, 'gn_participants') ) {
    rr = ids
  } else {
    rr = survey_load_participants(ids=ids)
  }
  gids = paste0("'", paste(rr$global_id, collapse="','"),"'")
  participations = NULL

  `%>%` <- dplyr::`%>%`

  for(year in years) {

    h = season.def(year)

    ii = dbQuery("select global_id, timestamp from ", h$intake, " where global_id in (", gids,")")

    ww = dbQuery("select global_id, timestamp from ", h$weekly, " where global_id in (", gids,")")

    ii = ii %>%
      dplyr::group_by(global_id) %>%
      dplyr::summarise(intake=dplyr::n())
    ww = ww %>%
      dplyr::group_by(global_id) %>%
      dplyr::summarise(weekly=dplyr::n())

    ii = merge(ii, ww, by="global_id", all=TRUE)

    if(nrow(ii) > 0) {
      ii$season = year
    }

    participations = dplyr::bind_rows(participations, ii)
  }
  participations
}
cturbelin/ifnBase documentation built on Nov. 5, 2023, 12:54 p.m.