R/sdft_submit.R

Defines functions sdft_submit

Documented in sdft_submit

#' Main function that runs the Station Demand Forecasting Tool
#'
#' @param dbhost Character. IP address or hostname of the machine with the PostgreSQL
#' database. Default is "localhost".
#' @param dbport Integer. Port number used by the PostgreSQL server.
#' @param dbname Character. Name of the PostgreSQL database. Default is "dafni".
#' @param dbuser Character. Name of the PostgreSQL database user. Default is "postgres". Note
#' that you need to set the password for this user using the \code{key_set()} function
#' from the keyring package before running \code{sdr_submit()}.
#' @param dirpath Character. Full path to the directory containing the input folder.
#' This folder will also be used for output and must be writable. Default is the
#' current working directory. On Windows you should use forward slashes in the path.
#' @importFrom stats filter na.omit
#' @importFrom utils read.csv write.csv
#' @importFrom keyring key_get
#' @importFrom DBI dbConnect dbDriver dbWriteTable dbDisconnect
#' @importFrom parallel detectCores makeCluster clusterEvalQ clusterExport stopCluster
#' @importFrom rlang .data
#' @import doParallel
#' @import futile.logger
#' @import checkmate
#' @export

sdft_submit <-
  function(dbhost = "localhost",
           dbport = 5432,
           dbname = "dafni",
           dbuser = "postgres",
           dirpath = getwd()) {
    # Check parameters
    submit.coll <- makeAssertCollection()
    assert_character(dbhost, any.missing = FALSE, add = submit.coll)
    assert_integerish(dbport, any.missing = FALSE, add = submit.coll)
    assert_character(dbname, any.missing = FALSE, add = submit.coll)
    assert_character(dbuser, any.missing = FALSE, add = submit.coll)
    assert_character(dirpath, any.missing = FALSE, add = submit.coll)
    reportAssertions(submit.coll)

    # Configuration----------------------------------------------------------------

    # check and set directory path

    assert_directory_exists(file.path(dirpath, "input", fsep = .Platform$file.sep),
                            access = "w")

    in_path <-
      (file.path(dirpath, "input", fsep = .Platform$file.sep))

    # Read config file
    config <-
      read.csv(
        file = file.path(in_path, "config.csv", fsep = .Platform$file.sep),
        sep = ";",
        colClasses = c(
          "job_id" = "character",
          "method" = "character",
          "testing" = "logical",
          "loglevel" = "character",
          "cores" = "integer"
        ),
        stringsAsFactors = FALSE,
        na.strings = c("")
      )

    # Check config file

    config.coll <- makeAssertCollection()

    # Check for valid job_id.
    # Check begins with a-z, then a-z or 0-9 or _ for an additional 19 matches
    # up to max 20 characters. Valid postgreSQL schema format.
    if (isFALSE(assertTRUE(
      grepl("^[a-z][a-z0-9_]{1,19}$", config$job_id, ignore.case = FALSE),
      na.ok = FALSE,
      add = config.coll
    ))) {
      config.coll$push("The database schema name uses config$job_id, which must be in a valid
             format.")
    }

    # check for valid method

    assert_choice(config$method,
                  c("isolation", "concurrent"),
                  null.ok = FALSE,
                  add = config.coll)

    # check testing is TRUE or FALSE

    assert_logical(config$testing, null.ok = FALSE, add = config.coll)

    # check for valid logging threshold

    valid_threshold <-
      c("FATAL",
        "ERROR",
        "WARN",
        "INFO",
        "DEBUG")

    assert_choice(config$loglevel,
                  valid_threshold,
                  null.ok = FALSE,
                  add = config.coll)

    # check for valid cores - minimum of 4
    # reset to actual cores if too high.

    if (config$cores > detectCores()) {
      config$cores <- detectCores()
    }


    if (isFALSE(testInteger(
      config$cores,
      lower = 4,
      null.ok = FALSE
    ))) {
      config.coll$push(paste0("At least 4 cores are required. This system has ",
                             detectCores(),
                             " cores"))
    }

    reportAssertions(config.coll)

    # create output directory if it doesn't already exist

    if (!dir.exists(file.path(dirpath, "output", fsep = .Platform$file.sep))) {
      dir.create(file.path(dirpath, "output", fsep = .Platform$file.sep))
    }

    # create job output folder

    out_path <-
      (file.path(dirpath, "output", config$job_id, fsep = .Platform$file.sep))

    if (!dir.exists(out_path)) {
      dir.create(out_path)
    }

    threshold <- config$loglevel


    # Setting up logging-----------------------------------------------------------

    log_file <-
      file.path(out_path, "sdr.log", fsep = .Platform$file.sep)

    # delete existing log file
    if (file.exists(log_file)) {
      file.remove(log_file)
    }

    flog.appender(appender.file(log_file))
    flog.threshold(threshold)

    # capture R errors and warnings to be logged by futile.logger
    options(
      showWarnCalls = TRUE,
      showErrorCalls = TRUE,
      show.error.locations = TRUE,
      error = function() {
        flog.error(geterrmessage())
      },
      warning.expression =
        quote({
          if (exists("last.warning", baseenv()) && !is.null(last.warning)) {
            txt <- paste0(names(last.warning), collapse = " ")
            futile.logger::flog.warn(txt)
          }
        })
    )

    flog.info("Log file initialised")

    # Setting up database connection-----------------------------------------------

    # Using keyring package for storing database password in OS credential store
    # to avoid exposing on GitHub.

    checkdb <- try(con <-
                     dbConnect(
                       RPostgres::Postgres(),
                       dbname = dbname,
                       host = dbhost,
                       port = dbport,
                       user = dbuser,
                       password = key_get(dbuser)
                     ))
    if (class(checkdb) == "try-error") {
      stop("Database connection has not been established")
    }

    # Setting up parallel processing-----------------------------------------------
    # This is currently used in the sdr_create_service_areas() and
    # sdr_generate_choicesets() functions, in a foreach loop.

    passwd <- key_get("postgres")

    # Number of clusters from config file (reserve 2 for OS)
    cl <- makeCluster(config$cores - 2)
    registerDoParallel(cl)
    # Pass variables in this function's (sdr_main) environment to each worker in the cluster
    clusterExport(
      cl = cl,
      varlist = c(
        "passwd",
        "dbname",
        "dbport",
        "dbhost",
        "dbuser",
        "threshold",
        "out_path"
      ),
      envir = environment()
    )

    checkcl <- try(clusterEvalQ(cl, {
      library(DBI)
      library(RPostgres)
      library(keyring)
      library(sdft)
      drv <- dbDriver("Postgres")
      con <-
        dbConnect(
          RPostgres::Postgres(),
          host = dbhost,
          port = dbport,
          user = dbuser,
          dbname = dbname,
          password = passwd
        )
      NULL
    }))
    if (class(checkcl) == "try-error") {
      stop("clusterEvalQ failed")
    }

    # Get station crscodes----------------------------------------------------------
    query <- paste0("
                select crscode from data.stations
                ")
    crscodes <- sdr_dbGetQuery(con, query)

    # Load input data---------------------------------------------------------------

    # check that the input files exist and can be read
    file.coll <- makeAssertCollection()
    assert_file_exists(
      file.path(in_path, "freqgroups.csv", fsep = .Platform$file.sep),
      access = "r",
      add = file.coll
    )
    assert_file_exists(
      file.path(in_path, "stations.csv", fsep = .Platform$file.sep),
      access = "r",
      add = file.coll
    )
    assert_file_exists(
      file.path(in_path, "exogenous.csv", fsep = .Platform$file.sep),
      access = "r",
      add = file.coll
    )
    reportAssertions(file.coll)


    # load freqgroups.csv
    freqgroups <-
      read.csv(
        file = file.path(in_path, "freqgroups.csv", fsep = .Platform$file.sep),
        sep = ";",
        colClasses = c("group_id" = "character",
                       "group_crs" = "character"),
        stringsAsFactors = FALSE,
        na.strings = c("")
      )

    # load stations.csv
    stations <-
      read.csv(
        file = file.path(in_path, "stations.csv", fsep = .Platform$file.sep),
        sep = ";",
        colClasses = c(
          "id" = "character",
          "stn_east" = "character",
          "stn_north" = "character",
          "acc_east" = "character",
          "acc_north" = "character",
          "freq" = "integer",
          "freqgrp" = "character",
          "carsp" = "integer",
          "ticketm" = "logical",
          "busint" = "logical",
          "cctv" = "logical",
          "terminal" = "logical",
          "electric" = "logical",
          "tcbound" = "logical",
          "category" = "character",
          "abstract" = "character"
        ),
        stringsAsFactors = FALSE,
        na.strings = c("")
      )


    # load exogenous.csv
    exogenous <-
      read.csv(
        file = file.path(in_path, "exogenous.csv", fsep = .Platform$file.sep),
        sep = ";",
        colClasses = c(
          "type" = "character",
          "number" = "integer",
          "centroid" = "character"
        ),
        stringsAsFactors = FALSE,
        na.strings = c("")
      )


    flog.info("Input files read")

    # Starting job-----------------------------------------------------------------

    # Set testing switch - If TRUE, this produces fake 60-minute proposed station
    # service areas which are actually only 5-minute service areas.
    testing <- config$testing

    # setting some variables
    have_freqgroups <- ifelse(nrow(freqgroups) > 0, TRUE, FALSE)
    have_exogenous <- ifelse(nrow(exogenous) > 0, TRUE, FALSE)
    isolation <- ifelse(config$method == "isolation", TRUE, FALSE)

    cat(
      paste0(
        "INFO [",
        format(Sys.time()),
        "] ",
        "Starting job. Logging threshold is ",
        threshold,
        "\n"
      ),
      file = file.path(log_file),
      append = TRUE
    )

    flog.info(paste0("Testing mode: ", ifelse(isTRUE(testing), "ON", "OFF")))

    # Check if the postcode_polygons table is available

    query <- paste0(
      "select exists (
   select from information_schema.tables
   where table_schema = 'data'
   and table_name = 'postcode_polygons'
)"
    )
    pcpoly <- sdr_dbGetQuery(con, query)

    # Pre-flight checks-------------------------------------------------------------

    preflight_failed <- FALSE

    # station checks---------------------------------------------------------------

    # create location column in stations (convert to numeric)
    stations$location <-
      paste0(as.numeric(stations$acc_east),
             ",",
             as.numeric(stations$acc_north))
    # rename id column
    colnames(stations)[1] <- "crscode"

    # Check that the ids (crscodes) are not used by any existing station

    idx <- which(stations$crscode %in% crscodes$crscode)

    if (length(idx) > 0) {
      preflight_failed <- TRUE
      flog.error(
        paste0(
          "The following station ids match existing station crscodes
                   and cannot be used: ",
          paste(stations$crscode[idx], collapse = ", ")
        )
      )
    }

    # check crscode is unique
    if (anyDuplicated(stations$crscode) > 0) {
      preflight_failed <- TRUE
      flog.error("Station id must be unique")
    }

    # check each row has a station name - must not be NA
    if (isFALSE(all(
      vapply(stations$name, function(x)
        grepl("^[a-zA-Z0-9 ]+$", x), logical(1), USE.NAMES = FALSE)
    ))) {
      preflight_failed <- TRUE
      flog.error("Station name must be alphanumeric string of length >= 1")
    }

    # check region is valid
    check_region <- function(region) {
      query <- paste0("select '",
                      region,
                      "' IN (select region from data.regional_uplifts)")
      as.logical(sdr_dbGetQuery(con, query))
    }

    idx <-
      which(
        vapply(stations$region, function(x)
          check_region(x), logical(1), USE.NAMES = FALSE) == FALSE
      )

    if (length(idx) > 0) {
      preflight_failed <- TRUE
      flog.error(paste0(
        "The following region(s) in station input are not valid: ",
        paste(stations$region[idx], collapse = ", ")
      ))
    }

    # station and access coordinates must be six digit strings 0-9
    if (isFALSE(all(vapply(stations$stn_east, function(x)
      grepl("^[0-9]{6}$", x), logical(1))))) {
      preflight_failed <- TRUE
      flog.error("Station eastings must be 6 character strings containing 0-9 only")
    }
    if (isFALSE(all(vapply(stations$stn_north, function(x)
      grepl("^[0-9]{6}$", x), logical(1))))) {
      preflight_failed <- TRUE
      flog.error("Station northings must be 6 character strings containing 0-9 only")
    }
    if (isFALSE(all(vapply(stations$acc_east, function(x)
      grepl("^[0-9]{6}$", x), logical(1))))) {
      preflight_failed <- TRUE
      flog.error("Access eastings must be 6 character strings containing 0-9 only")
    }
    if (isFALSE(all(vapply(stations$acc_north, function(x)
      grepl("^[0-9]{6}$", x), logical(1))))) {
      preflight_failed <- TRUE
      flog.error("Access northings must be 6 character strings containing 0-9 only")
    }

    # check if access location coordinates fall within GB extent
    check_coords <- function(coords) {
      query <- paste0(
        "
    select st_intersects(st_setsrid(st_makepoint(",
        coords,
        "),27700), geom)
    from data.gb_outline;
    "
      )
      as.logical(sdr_dbGetQuery(con, query))
    }

    idx <-
      which(
        vapply(stations$location, function(x)
          check_coords(x), logical(1), USE.NAMES = FALSE) == FALSE
      )
    if (length(idx) > 0) {
      preflight_failed <- TRUE
      flog.error(paste0(
        "The following station access points do not fall within GB: ",
        paste0(stations$crscode[idx], ": ", stations$location[idx], collapse = ", ")
      ))
    }

    # If concurrent:
    # Check station name is unique
    if (isFALSE(isolation)) {
      if (anyDuplicated(stations$name) > 0) {
        preflight_failed <- TRUE
        flog.error("Station name must be unique for concurrent mode")
      }
      # Check abstraction string is same for all stations (even if NA)
      if (length(unique(stations$abstract)) > 1) {
        preflight_failed <- TRUE
        flog.error(
          "Defined abstraction stations must be identical for all stations
               in concurrent mode"
        )
      }
      # Check same frequency group id is specified for every station (even if NA).
      if (isTRUE(have_freqgroups)) {
        if (length(unique(stations$freqgrp)) > 1) {
          preflight_failed <- TRUE
          flog.error(
            "When using concurrent mode the same frequency group must be specified
        for every station"
          )
        }
      }
    }

    # If isolation:
    # Check that any repeated station names only differ in respect of:
    # crscode, freq, freqgrp, carsp
    if (isTRUE(isolation)) {
      # if remove columns that are allowed to change
      if (nrow(
        stations %>% dplyr::select(-.data$crscode, -.data$freq, -.data$freqgrp, -.data$carsp) %>%
        # distinct should now only return as many rows as there are unique
        # station names
        dplyr::distinct()
      ) != length(unique(stations$name))) {
        preflight_failed <- TRUE
        flog.error(
          "In isolation mode stations with the same name can only differ
      in the values of id, freq, freqgrp, and carsp"
        )
      }
    }

    # check that frequency is integer > 0
    if (isFALSE(testIntegerish(stations$freq, lower = 1, any.missing = FALSE))) {
      preflight_failed <- TRUE
      flog.error("Service frequency must be an integer > 0")
    }
    # check that carsp is integer > =0
    if (isFALSE(testIntegerish(stations$carsp, lower = 0, any.missing = FALSE))) {
      preflight_failed <- TRUE
      flog.error("Parking spaces must be an integer > 0")
    }

    # check that station category is E or F
    if (isFALSE(testSubset(stations$category,
                           c("E", "F")))) {
      preflight_failed <- TRUE
      flog.error("Category must be either 'E' or 'F'")
    }

    # check abstraction station format is correct, if not NA
    # i.e. check for CSV input of three uppercase characters separated by commas
    # (also allowing single crscode with no comma). Maximum of three crscodes.
    abs_check <-
      vapply(na.omit(stations$abstract), function(x)
        grepl("^([A-Z]{3})(,[A-Z]{3}){0,2}$", x), logical(1), USE.NAMES = FALSE)
    if (isFALSE(all(abs_check))) {
      preflight_failed <- TRUE
      flog.error("Abstraction stations are not provided in the correct format")
    }

    # check that abstraction stations (if there are any) have valid crscodes
    if (length(unique(na.omit(stations$abstract))) > 0) {
      abs_crs <- unique(na.omit(unlist(strsplit(
        stations$abstract, ","
      ))))
      # get the index for those not valid
      idx <- which(!(abs_crs %in% crscodes$crscode))
      if (length(idx > 0)) {
        preflight_failed <- TRUE
        flog.error(paste(
          "The following abstraction group crscodes are not valid: ",
          paste(abs_crs[idx], collapse = ", ")
        ))
      }
    }

    # Frequency group checks---------------------------------------------------------
    if (isTRUE(have_freqgroups)) {
      # check frequency group format is ok
      fg_pairs <- unlist(strsplit(freqgroups$group_crs, ","))
      # check if all pairs match the required format
      if (isFALSE(all(
        vapply(fg_pairs, function(x)
          grepl("^[A-Z]{3}:[0-9]{1,}$", x), logical(1), USE.NAMES = FALSE)
      ))) {
        preflight_failed <- TRUE
        flog.error("Format of frequency groups is not correct")
      }

      # check crscodes in frequency groups are all valid
      # get the unique crscodes from pairs
      fg_crs <-
        unique(vapply(fg_pairs, function(x)
          sub(":.*", "", x), character(1), USE.NAMES = FALSE))
      # get the index for those not valid
      idx <- which(!(fg_crs %in% crscodes$crscode))
      if (length(idx > 0)) {
        preflight_failed <- TRUE
        flog.error(paste(
          "The following frequency group crscodes are not valid: ",
          paste(fg_crs[idx], collapse = ", ")
        ))
      }
    }

    # exogenous checks--------------------------------------------------------------
    if (isTRUE(have_exogenous)) {
      # check exogenous number column is positive integer for all rows
      if (!testIntegerish(exogenous$number,
                          lower = 1,
                          any.missing = FALSE)) {
        preflight_failed <- TRUE
        flog.error("Exogenous number values must all be positive integers")
      }

      # check valid centroids are used for the exogenous inputs
      # If type is ‘population’ or ‘housing’ the value can only be assigned to a
      # postcode centroid.
      check_pc_centroids <- function(centroid) {
        query <- paste0("select '",
                        centroid,
                        "' IN (select postcode from data.pc_pop_2011)")
        as.logical(sdr_dbGetQuery(con, query))
      }

      pop_houses <-
        exogenous %>% dplyr::filter(.data$type == "population" |
                                      .data$type == "houses")
      idx <-
        which(
          vapply(pop_houses$centroid, function(x)
            check_pc_centroids(x), logical(1), USE.NAMES = FALSE) == FALSE
        )
      if (length(idx) > 0) {
        preflight_failed <- TRUE
        flog.error(
          paste0(
            "The following postcode centroids in the exogenous input are not valid: ",
            paste0(pop_houses$type[idx], ": ", pop_houses$centroid[idx],
                   collapse = ", ")
          )
        )
      }

      # If type is ‘jobs’ the value can be assigned to either a postcode centroid or
      # a workplace centroid.
      check_wp_centroids <- function(centroid) {
        query <- paste0(
          "select '",
          centroid,
          "' IN (select wz from data.workplace2011 union select postcode from
      data.pc_pop_2011)"
        )
        as.logical(sdr_dbGetQuery(con, query))
      }

      jobs <- exogenous %>% dplyr::filter(.data$type == "jobs")
      idx <-
        which(
          vapply(jobs$centroid, function(x)
            check_wp_centroids(x), logical(1), USE.NAMES = FALSE) == FALSE
        )
      if (length(idx) > 0) {
        preflight_failed <- TRUE
        flog.error(
          paste0(
            "The following workplace centroids in the exogenous input are not valid: ",
            paste0(jobs$type[idx], ": ", jobs$centroid[idx], collapse = ", ")
          )
        )
      }
    }

    # stop if any of the pre-flight checks have failed
    if (isTRUE(preflight_failed)) {
      stop("Pre-flight checks have failed - see log file")
    } else {
      flog.info("Pre-flight checks passed")
    }


    # Database setup----------------------------------------------------------------

    # set schema name to job_id.
    schema <- config$job_id

    # create db schema to hold model data
    query <- paste0("
                create schema ", schema)
    sdr_dbExecute(con, query)

    # write data.frame of proposed stations to postgreSQL table
    dbWriteTable(
      conn = con,
      Id(schema = schema, table = "proposed_stations"),
      stations,
      append = FALSE,
      row.names = TRUE,
      field.types = c(
        ticketm = "text",
        busint = "text",
        cctv = "text",
        terminal = "text",
        electric = "text",
        tcbound = "text",
        category = "text"
      )
    )

    query <- paste0("
                alter table ",
                    schema,
                    ".proposed_stations rename \"row_names\"
                to id
                ")
    sdr_dbExecute(con, query)

    query <- paste0("
                alter table ",
                    schema,
                    ".proposed_stations alter column id type
                int using id::integer
                ")
    sdr_dbExecute(con, query)

    query <- paste0("
                alter table ",
                    schema,
                    ".proposed_stations add primary key (id)
                ")
    sdr_dbExecute(con, query)

    # create geometry column in proposed_stations table
    query <- paste0(
      "
  alter table ",
      schema,
      ".proposed_stations add column location_geom geometry(Point,27700)
  "
    )
    sdr_dbExecute(con, query)

    # populate location_geom
    query <- paste0(
      "
  update ",
      schema,
      ".proposed_stations set location_geom =
  ST_GeomFromText('POINT('||acc_east||' '||acc_north||')', 27700)
  "
    )
    sdr_dbExecute(con, query)

    flog.info("Proposed_stations table successfully created")

    # Prepare exogenous inputs------------------------------------------------------

    dbWriteTable(
      conn = con,
      Id(schema = schema, table = "exogenous_input"),
      exogenous,
      append =
        FALSE,
      row.names = TRUE,
      field.types = c(
        type = "text",
        number = "integer",
        centroid = "text"
      )
    )

    query <- paste0("
                alter table ",
                    schema,
                    ".exogenous_input rename \"row_names\" TO id
                ")
    sdr_dbExecute(con, query)

    query <- paste0("
                alter table ",
                    schema,
                    ".exogenous_input alter column id type int
                using id::integer
                ")
    sdr_dbExecute(con, query)

    query <- paste0("
                alter table ", schema, ".exogenous_input add primary key (id)
                ")
    sdr_dbExecute(con, query)

    # Add and populate geom column for the exogenous centroids.
    # Can be either a postcode centroid or a workplace centroid
    query <- paste0("
  alter table ",
                    schema,
                    ".exogenous_input add column geom geometry(Point,27700)
  ")
    sdr_dbExecute(con, query)

    query <- paste0(
      "
  with tmp as (
  select a.centroid, coalesce(b.geom, c.geom) as geom
  from ",
      schema,
      ".exogenous_input a
  left join data.pc_pop_2011 b on a.centroid = b.postcode
  left join data.workplace2011 c on a.centroid = c.wz
  )
  update ",
      schema,
      ".exogenous_input a
  set geom =
  (select distinct on (centroid) geom from tmp where a.centroid = tmp.centroid)
  "
    )
    sdr_dbExecute(con, query)

    # Create columns and populate data that will be used for adjusting postcode
    # probability weighted population

    # population column
    query <- paste0(
      "
  alter table ",
      schema,
      ".exogenous_input
  add column population int8,
  add column avg_hhsize numeric
  "
    )
    sdr_dbExecute(con, query)

    # Copy from number to population for type 'population'
    query <- paste0("
                update ",
                    schema,
                    ".exogenous_input set population = number where type
                ='population'
                ")
    sdr_dbExecute(con, query)

    # For type 'houses' we get the average household size for the local authority
    # in which the postcode is located, then calculate population and then populate
    # the population column and avg_hhsize column.
    query <- paste0(
      "
  with tmp as (
  select a.id, c.avg_hhsize_2019, a.number * c.avg_hhsize_2019 as population
  from ",
      schema,
      ".exogenous_input a
  left join data.pc_pop_2011 b
  on a.centroid = b.postcode
  left join data.hhsize c
  on b.oslaua = c.area_code
  where type = 'houses')
  update ",
      schema,
      ".exogenous_input a
  set population = tmp.population,
  avg_hhsize = tmp.avg_hhsize_2019
  from tmp
  where a.id = tmp.id
  "
    )
    sdr_dbExecute(con, query)

    flog.info("Exogenous table successfully created")


    # Create station service areas--------------------------------------------------

    # create distance-based service areas used in identifying nearest 10 stations to
    # each postcode centroid

    # Because there may be duplicate stations of the same name for sensitivity
    # analysis, we create the service areas for each unique station name in a
    # separate table and then update the proposed_stations table from that. While a
    # bit more complex, this removes unnecessary duplication of what can be a
    # relatively slow process for larger areas.

    flog.info("Starting to create station service areas")

    unique_stations <-
      stations %>%
      dplyr::distinct(.data$name, .keep_all = TRUE) %>%
      dplyr::select(.data$name, .data$location)

    dbWriteTable(
      conn = con,
      Id(schema = schema, table = "station_sas"),
      unique_stations,
      append =
        FALSE,
      row.names = FALSE
    )

    # Create service areas used during choiceset generation
    sdr_create_service_areas(
      con,
      out_path,
      schema = schema,
      df = unique_stations,
      identifier = "name",
      table = "station_sas",
      sa = c(
        1000,
        2000,
        3000,
        4000,
        5000,
        10000,
        20000,
        30000,
        40000,
        60000,
        80000,
        105000
      ),
      cost = "len"
    )

    # Create 2 minute service area - used to identify number of jobs within 2 minute
    # of station
    sdr_create_service_areas(
      con,
      out_path,
      schema = schema,
      df = unique_stations,
      identifier = "name",
      table = "station_sas",
      sa = c(2),
      cost = "time"
    )

    if (testing) {
      sdr_create_service_areas(
        con,
        out_path,
        schema = schema,
        df = unique_stations,
        identifier = "name",
        table = "station_sas",
        sa = c(5),
        cost = "time"
      )

      query <- paste0(
        "
    alter table ",
        schema,
        ".station_sas rename column service_area_5mins to service_area_60mins
    "
      )
      sdr_dbExecute(con, query)

      query <- paste0("
                  update data.stations set service_area_60mins =
                  service_area_5mins
                  ")
      sdr_dbExecute(con, query)
    } else {
      query <- paste0("
                  update data.stations set service_area_60mins =
                  service_area_60mins_actual
                  ")
      sdr_dbExecute(con, query)

      # Create 60 minute service area - used to identify postcode centroids to be
      # considered for inclusion in model
      sdr_create_service_areas(
        con,
        out_path,
        schema = schema,
        df = unique_stations,
        identifier = "name",
        table = "station_sas",
        sa = c(60),
        cost = "time",
        target = 0.9
      )
    }

    # Add the required service area columns to proposed_stations table and update
    # the geometries from stations_sas.
    # distance-based
    for (i in c(1000,
                2000,
                3000,
                4000,
                5000,
                10000,
                20000,
                30000,
                40000,
                60000,
                80000,
                105000)) {
      column_name <- paste0("service_area_", i / 1000, "km")
      query <-
        paste0(
          "alter table ",
          schema,
          ".proposed_stations add column if not exists ",
          column_name,
          " geometry(Polygon,27700);"
        )
      sdr_dbExecute(con, query)
      query <-
        paste0(
          "update ",
          schema,
          ".proposed_stations a set ",
          column_name,
          " = b.",
          column_name,
          " from ",
          schema,
          ".station_sas b where a.name = b.name"
        )
      sdr_dbExecute(con, query)
    }

    # time-based
    for (i in c(2, 60)) {
      column_name <- paste0("service_area_", i, "mins")
      query <-
        paste0(
          "alter table ",
          schema,
          ".proposed_stations add column if not exists ",
          column_name,
          " geometry(Polygon,27700);"
        )
      sdr_dbExecute(con, query)
      query <-
        paste0(
          "update ",
          schema,
          ".proposed_stations a set ",
          column_name,
          " = b.",
          column_name,
          " from ",
          schema,
          ".station_sas b where a.name = b.name"
        )
      sdr_dbExecute(con, query)
    }

    flog.info("Station service area generation completed")


    # Generate probability tables---------------------------------------------------

    flog.info("Starting to create choicesets and probability table(s)")

    if (isolation) {
      flog.info("Method is isolation")

      # which station names are duplicated?
      duplicates <- unique(stations$name[duplicated(stations$name)])

      # for crscodes where station name is NOT duplicated; straightforward
      for (crscode in stations$crscode[!(stations$name %in% duplicates)]) {
        choicesets <- sdr_generate_choicesets(con, schema, crscode)
        sdr_generate_probability_table(con, schema, choicesets, tolower(crscode))
        choicesets <- NULL
      }

      # for duplicated stations
      # only want to get the choicesets once per duplicated station
      for (name in duplicates) {
        # get the first crscode for a station with this name
        first_crs <- stations$crscode[stations$name == name][1]

        # generate the choiceset for that crs
        firstsets <-
          sdr_generate_choicesets(con, schema, first_crs)

        # generate probability table for first_crs
        sdr_generate_probability_table(con, schema, firstsets, tolower(first_crs))

        # then for every other crscode with this station name
        for (crscode in stations$crscode[stations$name == name][-1]) {
          # copy firstsets
          choicesets <- firstsets
          # update all occurrences of first_crs to this crscode
          choicesets$crscode[choicesets$crscode == first_crs] <-
            crscode
          # create probability table
          sdr_generate_probability_table(con, schema, choicesets, tolower(crscode))
        }
      }

      for (crscode in stations$crscode) {
        # make frequency group adjustments if required
        if (isTRUE(have_freqgroups) &
            (!is.na(stations$freqgrp[stations$crscode == crscode]))) {
          df <-
            data.frame(fgrp = freqgroups[freqgroups$group_id ==
                                           stations$freqgrp[stations$crscode == crscode],
                                         "group_crs"], stringsAsFactors = FALSE)
          sdr_frequency_group_adjustment(con, schema, df, tolower(crscode))
        }
        # calculate the probabilities
        sdr_calculate_probabilities(con, schema, tolower(crscode))
      } # end isolation
    } else {
      # Concurrent method
      flog.info("Method is concurrent")
      choicesets <-
        sdr_generate_choicesets(con, schema, stations$crscode)
      sdr_generate_probability_table(con, schema, choicesets, "concurrent")

      # make frequency group adjustments if required
      # must either be no frequency group entered or the same frequency group for
      # all stations. The latter is checked during pre-flight, so just need to take
      # the frequency group for the first station (if it isn't NA).
      if (isTRUE(have_freqgroups) & (!is.na(stations$freqgrp[1]))) {
        df <-
          data.frame(fgrp = freqgroups[freqgroups$group_id == stations$freqgrp[1],
                                       "group_crs"], stringsAsFactors = FALSE)
        sdr_frequency_group_adjustment(con, schema, df, "concurrent")
      }
      sdr_calculate_probabilities(con, schema, "concurrent")
    } # end concurrent


    # Trip end model----------------------------------------------------------------

    # create and populate 2-minute workplace population column in proposed_stations
    query <- paste0("
                alter table ",
                    schema,
                    ".proposed_stations
                add column workpop_2min int8
                ")
    sdr_dbExecute(con, query)

    query <- paste0(
      "
  with tmp as (
  select a.crscode, coalesce(sum(b.population), 0) as sum
  from ",
      schema,
      ".proposed_stations a
  left join data.workplace2011 b
  on st_within(b.geom, a.service_area_2mins)
  group by crscode)
  update ",
      schema,
      ".proposed_stations a
  set workpop_2min =
  (select sum from tmp where tmp.crscode = a.crscode)
  "
    )
    sdr_dbExecute(con, query)

    # adjustments for proposed workplace population from exogenous input file
    # update workplace pop in proposed_stations table if any of the exogenous
    # centroids are within the proposed station's 2-minute service area

    if (isTRUE(have_exogenous)) {
      flog.info("Making adjustments to workplace population from exogenous_input")
      query <- paste0(
        "
  with tmp as (
  select a.crscode, b.centroid, b.number
  from ",
        schema,
        ".proposed_stations a
  left join ",
        schema,
        ".exogenous_input b
  on st_within(b.geom, a.service_area_2mins)
  where b.type = 'jobs'
  )
  update ",
        schema,
        ".proposed_stations a
  set workpop_2min = workpop_2min +
  (select coalesce(sum(number), 0) from tmp where a.crscode = tmp.crscode)
  "
      )
      sdr_dbExecute(con, query)
    }

    # calculate probabilty weighted population for each station
    # create column in proposed_stations table
    query <- paste0("
                alter table ",
                    schema,
                    ".proposed_stations
                add column prw_pop int8
                ")
    sdr_dbExecute(con, query)

    for (crscode in stations$crscode) {
      if (isolation) {
        prweighted_pop <-
          sdr_calculate_prweighted_population(con, schema, crscode, tolower(crscode))
      } else {
        prweighted_pop <-
          sdr_calculate_prweighted_population(con, schema, crscode, "concurrent")
      }

      query <- paste0(
        "
    update ",
        schema,
        ".proposed_stations set prw_pop = ",
        prweighted_pop,
        " where crscode = '",
        crscode,
        "'"
      )
      sdr_dbExecute(con, query)
    }


    # Create GeoJSON catchment maps-------------------------------------------------
    # create column in proposed_stations table
    query <- paste0("
                alter table ",
                    schema,
                    ".proposed_stations
                add column catchment json
                ")
    sdr_dbExecute(con, query)

    for (crscode in stations$crscode) {
      if (isolation) {
        sdr_create_json_catchment(con,
                                  schema,
                                  "proposed",
                                  pcpoly$exists,
                                  crscode,
                                  tolower(crscode),
                                  tolerance = 2)
      } else {
        sdr_create_json_catchment(con,
                                  schema,
                                  "proposed",
                                  pcpoly$exists,
                                  crscode,
                                  "concurrent",
                                  tolerance = 2)
      }
    }


    # Generate forecasts------------------------------------------------------------

    flog.info("Generating demand forecasts")
    query <- paste0(
      "
  alter table ",
      schema,
      ".proposed_stations
  add column forecast_base int8,
  add column forecast_uplift int8;
  "
    )
    sdr_dbExecute(con, query)

    # version with altered application of decay function
    # and 2 minute work population (1 mile)
    # Coefficients:
    #   Estimate Std. Error t value Pr(>|t|)
    # (Intercept)                  3.515281   0.094543  37.182  < 2e-16 ***
    #   log(te19cmb_15212_adj)       0.366328   0.018018  20.331  < 2e-16 ***
    #   log(dailyfrequency_2013_all) 1.124473   0.027686  40.615  < 2e-16 ***
    #   log1p(work_pop_2m)           0.056579   0.007915   7.149 1.27e-12 ***
    #   log1p(carspaces)             0.126181   0.009195  13.723  < 2e-16 ***
    #   electric_dummy               0.236574   0.041156   5.748 1.06e-08 ***
    #   tcard_bound_dummy            0.299954   0.091444   3.280  0.00106 **
    #   TerminusDummy                0.792108   0.083448   9.492  < 2e-16 ***
    #   ---
    #   Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1
    #
    # Residual standard error: 0.6919 on 1784 degrees of freedom
    # Multiple R-squared:  0.8502,	Adjusted R-squared:  0.8496
    # F-statistic:  1446 on 7 and 1784 DF,  p-value: < 2.2e-16
    #
    var_intercept <- 3.515281
    var_population <- 0.366328
    var_frequency <- 1.124473
    var_workpop <- 0.056579
    var_carspaces <- 0.126181
    var_electric <- 0.236574
    var_tcardbound <- 0.299954
    var_terminus <- 0.792108

    # base forecast
    query <- paste0(
      "
  with tmp as (
  select id, round(exp(",
      var_intercept,
      " + (ln(prw_pop + 1) * ",
      var_population,
      ") + (ln(freq) * ",
      var_frequency,
      ") + (ln(workpop_2min + 1) * ",
      var_workpop,
      ") + (ln(carsp + 1) * ",
      var_carspaces,
      ")
  + (electric::boolean::int * ",
      var_electric,
      ") + (tcbound::boolean::int * ",
      var_tcardbound,
      ") + (terminal::boolean::int * ",
      var_terminus,
      ")))
  as forecast_base
  from ",
      schema,
      ".proposed_stations
  )
  update ",
      schema,
      ".proposed_stations a set forecast_base = tmp.forecast_base from tmp
  where a.id = tmp.id
  "
    )
    sdr_dbExecute(con, query)

    # regional-based uplift forecast
    query <- paste0(
      "
  with tmp as (
  select a.id, round(a.forecast_base + ((b.pcchange/100) * a.forecast_base))
  as forecast_uplift from ",
      schema,
      ".proposed_stations a
  left join data.regional_uplifts b
  on a.region = b.region
  )
  update ",
      schema,
      ".proposed_stations a set forecast_uplift = tmp.forecast_uplift from tmp
  where a.id = tmp.id
  "
    )
    sdr_dbExecute(con, query)

    # Abstraction analysis----------------------------------------------------------

    # Only process if abstraction analysis is required.
    if (length(unique(na.omit(stations$abstract))) > 0) {
      flog.info("Starting abstraction analysis")
      # Create the abstraction results table
      query <- paste0(
        "create table ",
        schema,
        ".abstraction_results (
    id serial primary key,
    proposed text,
    proposed_name text,
    at_risk text,
    prwpop_before int,
    prwpop_after int,
    change real,
    pc_change real,
    entsexits integer,
    adj_trips integer,
    trips_change integer,
    catchment_before json,
    catchment_after json
  )"
      )
      sdr_dbExecute(con, query)

      # populate abstraction results table with proposed (or 'concurrent') and
      # at_risk stations
      if (isTRUE(isolation)) {
        # for proposed stations that require an abstraction analysis
        for (proposed in stations$crscode[!is.na(stations$abstract)]) {
          # for each at-risk station specified for this proposed station
          for (at_risk in unlist(strsplit(stations$abstract[stations$crscode ==
                                                            proposed], ",")))
          {
            query <- paste0(
              "insert into ",
              schema,
              ".abstraction_results (id, proposed, proposed_name, at_risk)
          values (default,'",
              proposed,
              "','",
              stations$name[stations$crscode == proposed],
              "','",
              at_risk,
              "')"
            )
            sdr_dbExecute(con, query)
          }
        }
      } else {
        # for concurrent mode, at-risk stations must be the same for every station
        # so just get them from the first porposed station (if not NA)
        for (at_risk in na.omit(unlist(strsplit(stations$abstract[1], ","))))
        {
          query <- paste0(
            "insert into ",
            schema,
            ".abstraction_results (id, proposed, at_risk) values (default,'concurrent','",
            at_risk,
            "')"
          )
          sdr_dbExecute(con, query)
        }
      }

      # populate abstraction_results table with entsexits for each at-risk station
      query <- paste0(
        "	update ",
        schema,
        ".abstraction_results a
    set entsexits = b.entsexits
    from data.stations b
    where a.at_risk = b.crscode"
      )
      sdr_dbExecute(con, query)

      # create BEFORE probability tables--------------------------------------------

      # Get unique at_risk stations first
      unique_atrisk <-
        unique(na.omit(unlist(strsplit(
          stations$abstract, ","
        ))))

      # Get easting and northings for these stations
      query <- paste0(
        "
    select crscode, easting || ',' || northing as location from data.stations where
    crscode in (",
        paste(
          "'",
          unique_atrisk,
          "'",
          sep = "",
          collapse = ","
        ),
        ")
    "
      )
      unique_atrisk <- sdr_dbGetQuery(con, query)

      flog.info("Creating BEFORE choicesets and probability tables")
      for (at_risk in unique_atrisk$crscode) {
        # generate the choiceset for this at_risk station
        # only want to do this once for each at_risk.
        choicesets <-
          sdr_generate_choicesets(con, schema, at_risk, existing = TRUE)
        # Isolation mode
        if (isTRUE(isolation)) {
          # for each proposed station
          for (crscode in stations$crscode) {
            # check if this at_risk is in stations$abstract for this crscode
            if (grepl(paste(at_risk), stations$abstract[stations$crscode == crscode])) {
              # Generate probability table
              sdr_generate_probability_table(con,
                                             schema,
                                             choicesets,
                                             paste0(
                                               tolower(at_risk),
                                               "_before_abs_",
                                               tolower(crscode)
                                             ))
              # Make frequency group adjustments
              if (isTRUE(have_freqgroups) &
                  !is.na(stations$freqgrp[stations$crscode == crscode])) {
                df <-
                  data.frame(fgrp = freqgroups[freqgroups$group_id
                                               == stations$freqgrp[stations$crscode == crscode],
                                               "group_crs"], stringsAsFactors = FALSE)
                sdr_frequency_group_adjustment(con,
                                               schema,
                                               df,
                                               paste0(
                                                 tolower(at_risk),
                                                 "_before_abs_",
                                                 tolower(crscode)
                                               ))
              }
              # calculate probabilities
              sdr_calculate_probabilities(con,
                                          schema,
                                          paste0(
                                            tolower(at_risk),
                                            "_before_abs_",
                                            tolower(crscode)
                                          ))
              # get probability weighted population
              prweighted_pop_before <-
                sdr_calculate_prweighted_population(con,
                                                    schema,
                                                    at_risk,
                                                    paste0(
                                                      tolower(at_risk),
                                                      "_before_abs_",
                                                      tolower(crscode)
                                                    ))
              # update abstraction_results table
              query <- paste0(
                "update ",
                schema,
                ".abstraction_results set prwpop_before = ",
                prweighted_pop_before,
                " where at_risk = '",
                at_risk,
                "' and proposed = '",
                crscode,
                "'"
              )
              sdr_dbExecute(con, query)
              # generate GeoJSON catchments
              sdr_create_json_catchment(
                con,
                schema,
                "abstraction",
                pcpoly$exists,
                at_risk,
                paste0(
                  tolower(at_risk),
                  "_before_abs_",
                  tolower(crscode)
                ),
                tolerance = 2
              )
            } # end at_risk is in stations$abstract for this crscode
          } # end crscode in stations
        } else {
          # Now for concurrent (we are still in the unique at_risk loop)
          # Generate probability table
          sdr_generate_probability_table(con,
                                         schema,
                                         choicesets,
                                         paste0(tolower(at_risk),
                                                "_before_abs_concurrent"))
          # Make frequency group adjustments
          # Only need to look at first row as must be same for all stations
          if (isTRUE(have_freqgroups) &
              (!is.na(stations$freqgrp[1]))) {
            df <-
              data.frame(fgrp = freqgroups[freqgroups$group_id ==
                                             stations$freqgrp[1], "group_crs"],
                         stringsAsFactors = FALSE)
            sdr_frequency_group_adjustment(con,
                                           schema,
                                           df,
                                           paste0(tolower(at_risk),
                                                  "_before_abs_concurrent"))
          }
          # calculate probabilities
          sdr_calculate_probabilities(con,
                                      schema,
                                      paste0(tolower(at_risk),
                                             "_before_abs_concurrent"))
          # get probability weighted population
          prweighted_pop_before <-
            sdr_calculate_prweighted_population(con,
                                                schema,
                                                at_risk,
                                                paste0(tolower(at_risk),
                                                       "_before_abs_concurrent"))

          # update abstraction_results table
          query <- paste0(
            "update ",
            schema,
            ".abstraction_results set prwpop_before = ",
            prweighted_pop_before,
            " where at_risk = '",
            at_risk,
            "' and proposed = 'concurrent'"
          )
          sdr_dbExecute(con, query)
          # generate before GeoJSON catchments
          sdr_create_json_catchment(
            con,
            schema,
            "abstraction",
            pcpoly$exists,
            at_risk,
            paste0(tolower(at_risk), "_before_abs_concurrent"),
            tolerance = 2
          )
        } # end concurrent
      } # end for each unique at_risk station

      # Create AFTER probability tables---------------------------------------------
      flog.info("Creating AFTER choicesets and probability tables")
      # For isolation mode we loop through each proposed station that requires
      # abstraction analysis (i.e not NA)
      if (isTRUE(isolation)) {
        for (proposed in stations$crscode[!is.na(stations$abstract)]) {
          # Then use a nested loop for each of the at-risk stations.
          for (at_risk in unlist(strsplit(stations$abstract[stations$crscode
                                                            == proposed], ",")))
          {
            choicesets <-
              sdr_generate_choicesets(con,
                                      schema,
                                      proposed,
                                      existing = FALSE,
                                      abs_crs = at_risk)

            # Generate probability table
            sdr_generate_probability_table(con,
                                           schema,
                                           choicesets,
                                           paste0(tolower(at_risk),
                                                  "_after_abs_",
                                                  tolower(proposed)))
            # make frequency group adjustments if required
            if (isTRUE(have_freqgroups) &
                (!is.na(stations$freqgrp[stations$crscode == proposed]))) {
              df <-
                data.frame(fgrp = freqgroups[freqgroups$group_id ==
                                               stations$freqgrp[stations$crscode == proposed],
                                             "group_crs"], stringsAsFactors = FALSE)
              sdr_frequency_group_adjustment(con,
                                             schema,
                                             df,
                                             paste0(
                                               tolower(at_risk),
                                               "_after_abs_",
                                               tolower(proposed)
                                             ))
            }
            # calculate probabilities
            sdr_calculate_probabilities(con,
                                        schema,
                                        paste0(tolower(at_risk),
                                               "_after_abs_",
                                               tolower(proposed)))
            # get probability weighted population
            prweighted_pop_after <-
              sdr_calculate_prweighted_population(con,
                                                  schema,
                                                  at_risk,
                                                  paste0(tolower(at_risk),
                                                         "_after_abs_",
                                                         tolower(proposed)))
            # update abstraction_results table
            query <- paste0(
              "update ",
              schema,
              ".abstraction_results
          set prwpop_after = ",
              prweighted_pop_after,
              " where proposed = '",
              proposed,
              "' and at_risk = '",
              at_risk,
              "'"
            )
            sdr_dbExecute(con, query)
            query <- paste0(
              "update ",
              schema,
              ".abstraction_results
          set change = prwpop_after - prwpop_before,
          pc_change = ((prwpop_after - prwpop_before::real) / prwpop_before) * 100
          where proposed = '",
              proposed,
              "' and at_risk = '",
              at_risk,
              "'"
            )
            sdr_dbExecute(con, query)
            # generate GeoJSON catchments
            sdr_create_json_catchment(
              con,
              schema,
              "abstraction",
              pcpoly$exists,
              at_risk,
              paste0(tolower(at_risk),
                     "_after_abs_",
                     tolower(proposed)),
              proposed,
              tolerance = 2
            )
          }
        }
      } else {
        # For the concurrent method, abstraction entry MUST be same for all proposed
        # stations when submitted (this is checked in pre-flight). So we just need to
        # get the entry for the first row - if not NA - and loop through each of the
        # at_risk stations.
        for (at_risk in na.omit(unlist(strsplit(stations$abstract[1], ",")))) {
          # for each at-risk station we pass all the proposed stations to the function
          # - as all would be present in the after situation in concurrent mode
          choicesets <-
            sdr_generate_choicesets(con,
                                    schema,
                                    stations$crscode,
                                    existing = FALSE,
                                    abs_crs = at_risk)
          # Generate probability table
          sdr_generate_probability_table(con,
                                         schema,
                                         choicesets,
                                         paste0(tolower(at_risk),
                                                "_after_abs_concurrent"))
          # Make frequency group adjustments if required.
          # Only a single identical frequency group for all stations under
          # concurrent treatment. This is checked during pre-flight. So we just use
          # the first row
          if (isTRUE(have_freqgroups) &
              (!is.na(stations$freqgrp[1]))) {
            df <-
              data.frame(fgrp = freqgroups[freqgroups$group_id ==
                                             stations$freqgrp[1], "group_crs"],
                         stringsAsFactors = FALSE)
            sdr_frequency_group_adjustment(con,
                                           schema,
                                           df,
                                           paste0(tolower(at_risk),
                                                  "_after_abs_concurrent"))
          }
          # calculate probabilities
          sdr_calculate_probabilities(con,
                                      schema,
                                      paste0(tolower(at_risk),
                                             "_after_abs_concurrent"))
          # get probability weighted population
          prweighted_pop_concurrent <-
            sdr_calculate_prweighted_population(con,
                                                schema,
                                                at_risk,
                                                paste0(tolower(at_risk),
                                                       "_after_abs_concurrent"))
          # update abstraction_results table
          query <- paste0(
            "update ",
            schema,
            ".abstraction_results
        set prwpop_after = ",
            prweighted_pop_concurrent,
            " where proposed = 'concurrent' and at_risk = '",
            at_risk,
            "'"
          )
          sdr_dbExecute(con, query)
          query <- paste0(
            "update ",
            schema,
            ".abstraction_results
        set change = prwpop_after - prwpop_before,
        pc_change = ((prwpop_after - prwpop_before::real) / prwpop_before) * 100
        where proposed = 'concurrent' and at_risk = '",
            at_risk,
            "'"
          )
          sdr_dbExecute(con, query)
          # generate GeoJSON catchments
          sdr_create_json_catchment(
            con,
            schema,
            "abstraction",
            pcpoly$exists,
            at_risk,
            paste0(tolower(at_risk), "_after_abs_concurrent"),
            "concurrent",
            tolerance = 2
          )
        }
      } # end concurrent

      # For concurrent and isolation modes
      # Calculate change between the before and after situation
      query <- paste0(
        "update ",
        schema,
        ".abstraction_results
    set adj_trips = round(entsexits + ((pc_change / 100) * entsexits))"
      )
      sdr_dbExecute(con, query)
      query <- paste0("update ",
                      schema,
                      ".abstraction_results
    set trips_change = adj_trips - entsexits")
      sdr_dbExecute(con, query)

    } # end abstraction analysis

    # Pulling outputs---------------------------------------------------------------

    flog.info("Retrieving station forecast(s) from database")

    # stations

    query <-
      paste0(
        "select crscode as id, name, region, stn_east as station_easting, stn_north as station_northing,
acc_east as access_easting, acc_north as access_northing, freq as frequency, freqgrp as frequency_group, carsp as parking_spaces, ticketm as ticket_machine, busint as bus_interchange, cctv, terminal as terminal_station, tcbound as travelcard_boundary, category, abstract as abstraction_stations, workpop_2min as work_population_2mins, prw_pop as weighted_population, forecast_base, forecast_uplift
from ",
        schema,
        ".proposed_stations"
      )
    out_stations <- sdr_dbGetQuery(con, query)
    write.csv(
      out_stations,
      file.path(out_path, "station_forecast.csv", fsep = .Platform$file.sep),
      row.names = FALSE
    )

    # abstraction analysis

    if (length(unique(na.omit(stations$abstract))) > 0) {
      flog.info("Retrieving abstraction analysis from database")
      query <- paste0(
        "select proposed as proposed_id, proposed_name, at_risk as impacted_station, prwpop_before, prwpop_after, change, pc_change, entsexits as entries_exits, adj_trips, trips_change
from ",
        schema,
        ".abstraction_results"
      )
      out_abstract <- sdr_dbGetQuery(con, query)
      write.csv(
        out_abstract,
        file.path(out_path, "abstraction_analysis.csv", fsep = .Platform$file.sep),
        row.names = FALSE
      )
    }

    # Exogenous table
    query <- paste0(
      "select type, number, centroid, population, avg_hhsize from ",
      schema,
      ".exogenous_input"
    )
    out_exogenous <- sdr_dbGetQuery(con, query)
    write.csv(
      out_exogenous,
      file.path(out_path, "exogenous_inputs.csv", fsep = .Platform$file.sep),
      row.names = FALSE
    )

    # Station catchments

    flog.info("Retrieving station catchment(s)")

    query <-
      paste0("select crscode, catchment from ",
             schema,
             ".proposed_stations")
    out_catchments <- sdr_dbGetQuery(con, query)

    for (i in 1:nrow(out_catchments)) {
      catchment <- out_catchments$catchment[i]
      crscode <- out_catchments$crscode[i]
      write(
        catchment,
        file.path(
          out_path,
          paste0(tolower(crscode), "_catchment.geojson"),
          fsep = .Platform$file.sep
        )
      )
    }

    # Abstraction catchments

    if (length(unique(na.omit(stations$abstract))) > 0) {
      flog.info("Retrieving abstraction analysis catchments")
      query <- paste0(
        "select proposed, at_risk, catchment_before, catchment_after from ",
        schema,
        ".abstraction_results"
      )
      out_abscatchments <- sdr_dbGetQuery(con, query)

      for (i in 1:nrow(out_abscatchments)) {
        at_risk <- out_abscatchments$at_risk[i]
        proposed <- out_abscatchments$proposed[i]
        write(
          out_abscatchments$catchment_before[i],
          file.path(
            out_path,
            paste0(
              tolower(proposed),
              "_",
              tolower(at_risk),
              "_catchment_before.geojson"
            ),
            fsep = .Platform$file.sep
          )
        )
        write(
          out_abscatchments$catchment_after[i],
          file.path(
            out_path,
            paste0(
              tolower(proposed),
              "_",
              tolower(at_risk),
              "_catchment_after.geojson"
            ),
            fsep = .Platform$file.sep
          )
        )
      }

    }

    # Closing actions---------------------------------------------------------------

    flog.info("Tidying up")

    stopCluster(cl)
    dbDisconnect(con)

    flog.info("Job finished")

  }
station-demand-forecasting-tool/sdft documentation built on July 11, 2021, 4:23 a.m.