R/spew.R

Defines functions call_spew spew spew_seq spew_sock spew_mc spew_mpi spew_place ccount people_to_households write_data write_pop_table write_schools write_workplaces print_region_list

Documented in call_spew ccount people_to_households print_region_list spew spew_mc spew_mpi spew_place spew_seq spew_sock write_data write_pop_table write_schools write_workplaces

#' Wrapper for reading, formatting, and writing SPEW ecosystems 
#' 
#' Generates SPEW synthetic ecosystems on the Olympus Computing Cluster 
#' 
#' @param input_dir character specifying input directory
#' @param output_dir charcater specifying output directory 
#' @param folders list specifying sub-directories for each data-source 
#' @param data_group character, either "US", "ipums", or "none"
#' @param run_type Whether to run sequentially in parallel. Default is "SEQ", for 
#' a sequential run. If parallel, back-end is either "MPI", "SOCK", or "MC"
#' @param output_type Default is "console" if we want to resulting population 
#' as an R variable. Alternative is "write", which is used on Olympus for writing 
#' out .csv files of the population
#' @param convert_count logical meant to indicate if we are going to convert 
#' population totals from people to household counts. Default: FALSE, assumes
#' the population is the total number of households 
#' @param vars list with two elements: household and person. This specifies 
#' which variables to include in the corresponding household and person PUMS data-set
#' @param sampling_method character indicating the type of sampling 
#' method to use, defaults to "uniform". Can also be "ipf" with appropriate marginal data. 
#' @param locations_method character indicating the type of location 
#' sampling to use, defaults to "uniform", can also be "roads". 
#' @param outfile_loc Defaults to "", so we print out the parallel run information. 
#' Only set to "/dev/null" for internal testing putposes.
#' @param road_noise Noise added to households during road-based sampling 
#' @param timer logical indicating we want to time the run 
#' @param verbose logical indicating we want to print output during the run.  Default is FALSE.#' 
#' 
#' @export
#' 
#' @return logical indicating whether or not this run of spew ended successfully 
call_spew <- function(input_dir, output_dir, folders = NULL, data_group = "US", run_type = "SEQ",
                      sampling_method = "uniform", locations_method = "uniform", output_type = "write", 
                      convert_count = FALSE, vars = list(household = NA, person = NA),
                      road_noise = .0002, outfile_loc = "", timer = TRUE, verbose = TRUE) {
  if (timer) { spew_start_time <- Sys.time() }
    
  # Read and format data from olympus directories ---
  data_list <- read_data(input_dir = input_dir, folders = folders, data_group = data_group, vars = vars)
  formatted_data <- format_data(data_list = data_list, data_group = data_group)

  # Call the SPEW algorithm on the formatted data ---
  spew(pop_table = formatted_data$pop_table, shapefile = formatted_data$shapefiles, 
       pums_h = formatted_data$pums$pums_h, pums_p = formatted_data$pums$pums_p,
       schools = formatted_data$schools, workplaces = formatted_data$workplaces, 
       marginals = formatted_data$marginals, output_type = output_type, output_dir = output_dir, 
       locations_method = locations_method, convert_count = convert_count,
       sampling_method = sampling_method, run_type = run_type, outfile_loc = outfile_loc, 
       road_noise = road_noise, timer = timer, verbose = verbose)

  # Print out the overall run-time of SPEW ---
  if (timer) {
    spew_time <- difftime(Sys.time(), spew_start_time, units = "secs")
    spew_time <- round(spew_time, digits = 2)
  }
  
  if (verbose) {
    spew_statement <- paste0("SPEW Runs in: ", spew_time)
    print(spew_statement)
  }
  
  return(TRUE)
}

#' SPEW algorithm to generate synthetic ecosystems
#' 
#' @param pop_table dataframe where rows correspond to places where populations 
#' should be generated. Other requird columns are "n_house" and "puma_id"
#' @param shapefile sp class object used for assigning households to 
#' particular locations  
#' @param pums_h dataframe with microdata corresponding to housegolds 
#' @param pums_p dataframe with microdata corresponding to people
#' @param schools list with names "public" and "private" with a 
#' dataframe of schools corresponding to public or private, respectively
#' @param workplaces dataframe of workplaces with a workplace_id column, 
#' employees column, and stcotr column
#' @param marginals list of marginal population totals. Each element of the 
#' list contains the marginal totals of a separate variable 
#' @param output_type Default is "console" if we want to resulting population 
#' as an R variable. Alternative is "write", which is used on Olympus for writing 
#' out .csv files of the population
#' @param output_dir character specifying ecosystem directory write to. Only used if
#' output_type = "write"
#' @param convert_count logical meant to indicate if we are going to convert 
#' population totals from people to household counts. Default: FALSE, assumes
#' the population is the total number of households 
#' @param run_type Whether to run sequentially in parallel. Default is "SEQ", for 
#' a sequential run. If parallel, back-end is either "MPI", "SOCK", or "MC"
#' @param sampling_method character indicating the type of sampling 
#' method to use, defaults to "uniform". Can also be "ipf" with appropriate marginal data. 
#' @param locations_method character indicating the type of location 
#' sampling to use, defaults to "uniform", can also be "roads". 
#' @param outfile_loc Defaults to "", so we print out the parallel run information. 
#' Only set to "/dev/null" for internal testing putposes.
#' @param road_noise Noise added to households during road-based sampling 
#' @param timer logical indicating we want to time the run 
#' @param verbose logical indicating we want to print output during the run.  Default is FALSE.
#' 
#' @export
#' 
#' @return logical indicating whether or not this run of spew ended successfully 
#' 
#' @examples 
#' # Call spew with default data from tartanville ---
#' data(tartanville)
#' tartanville_syneco <- spew(pop_table = tartanville$pop_table, 
#'                            shapefile = tartanville$shapefile, 
#'                            pums_h = tartanville$pums_h, 
#'                            pums_p = tartanville$pums_p)
#'                            
#' # Call spew with road-based location sampling ---
#' roads_shapefile <- list(boundaries = tartanville$shapefile, 
#'                         roads = tartanville$roads)                            
#' tartanville_syneco_roads <- spew(tartanville$pop_table, roads_shapefile,
#'                                  tartanville$pums_h, tartanville$pums_p,
#'                                  locations_method = "roads", road_noise = .05)
#' 
#' # Call spew with ipf agent-sampling ---
#' 
#' # Household income marginal 
#' var_name <- "HHINC"
#' type <- "ord"
#' bounds <- data.frame(lower = c(0, 50), upper = c(49, Inf))
#' category_name <- c("HHINC_0-49", "HHINC_50-Inf")
#' df <- data.frame(place_id = paste0("T", 1:9),  v1 = c(30, 0, 5, 10, 13, 9, 2, 1, 5))
#' df$v2 <- tartanville$pop_table$n_house - df$v1
#' ipf_obj_hhinc<- make_ipf_obj(var_name, type, bounds, category_name, df = df)
#' # Head of Household Race Marginal 
#' var_name <- c("RAC1P")
#' type <- "cat"
#' bounds <- data.frame(lower = c(1, 2), upper = c(1, 2))
#' category_name <- c("Tartan", "Argyle")
#' df2 <- data.frame(place_id = paste0("T", 1:9),  v1 = c(28, 0, 4, 1, 5, 8, 2, 1, 3))
#' df2$v2 <- tartanville$pop_table$n_house - df2$v1
#' ipf_obj_rac1p <- make_ipf_obj(var_name, type, bounds, category_name, df = df2)
#' ipf_obj <- list(HHINC = ipf_obj_hhinc[[1]], RAC1P = ipf_obj_rac1p[[1]])
#' supplementary_data <- list(moments = ipf_obj)
#' 
#' tartanville_syneco_ipf <- spew(tartanville$pop_table, tartanville$shapefile,
#'                               tartanville$pums_h, tartanville$pums_p,
#'                                marginals = supplementary_data$moments, 
#'                               sampling_method = "ipf")
#' 
#' # Call spew with moment-matching agent-sampling 
#' NP_avg <- c(3.2, 0, 6.0, 2.0, 3.2, 3.1, 4.0, 4.8, 3.9)
#' supplementary_data <- list(moments = make_mm_obj(moments_list = 
#'                            list(mom1 = data.frame(place_id = paste0("T", 1:9), 
#'                                 puma_id = "T", NP = NP_avg)), 
#'                            assumption = "independence", nMom = 1, type = "cont"))
#' tartanville_syneco_mm <- spew(pop_table = tartanville$pop_table, 
#'                              shapefile = tartanville$shapefile,
#'                              pums_h = tartanville$pums_h, 
#'                              pums_p = tartanville$pums_p,
#'                              marginals = supplementary_data$moments, 
#'                              sampling_method = "mm")
#'
spew <- function(pop_table, shapefile, pums_h, pums_p, 
                 schools = NULL, workplaces = NULL, marginals = NULL, 
                 output_type = "console", output_dir = NULL, convert_count = FALSE, 
                 run_type = "SEQ", sampling_method = "uniform", locations_method = "uniform", 
                 outfile_loc = "", road_noise = .0002, timer = FALSE, verbose = FALSE) {
  if (timer == TRUE) { location_start_time <- Sys.time() }
  
  # Write out both the pop-table and environments ---
  if (output_type == "write") {
    if (is.null(output_dir)) { 
      stop("If output_type = 'write', must specify output_dir") 
    }
  
    if (!is.null(output_dir)) { dir.create(output_dir, recursive = TRUE) }
    write_pop_table(pop_table, output_dir)
    
    env_dir <- file.path(output_dir, "environments")
    if (!dir.exists(env_dir)) { dir.create(env_dir, recursive = TRUE) }
    if (!is.null(schools)) { write_schools(schools, env_dir) }
    if (!is.null(workplaces)) { write_workplaces(workplaces, env_dir) }
  } 
  
  # Call one of the SPEW run-types ---

  # Set the objects to send to the workers if parallel back-end is SOCK or MPI 
  export_objects <- c("spew_place", "people_to_households", "sample_households", 
                      "sample_locations", "sample_people", "write_data", 
                      "people_to_households", "assign_schools", "assign_schools_inner", 
                      "weight_dists", "get_dists", "haversine", "subset_schools", 
                      "assign_workplaces", "assign_workplaces_inner", "remove_holes", 
                      "sample_locations_uniform", "sample_locations_roads", 
                      "subset_shapes_roads", "samp_roads", "print_region_list", "read_roads", 
                      "sample_uniform", "sample_ipf", "subset_pums", "align_pums", 
                      "fill_cont_table", "update_freqs", "get_targets", "sample_with_cont", 
                      "samp_ipf_inds", "assign_weights", "calc_dists", "get_ord_dists", 
                      "get_cat_dists", "remove_excess","sample_mm", "solve_mm_weights", 
                      "solve_mm_for_joint","solve_mm_for_var", "extrapolate_probs_to_pums",
                      "extrapolate_probs_to_pums_joint", "make_mm_obj", "impute_missing_vals", 
                      "ccount", "read_shapespatial_to_ogr")
  
  # Call either the sequential, or parallel version of the SPEW algorithm 
  num_places <- nrow(pop_table)
  if (run_type == "SEQ") {
    region_list <- spew_seq(num_places = num_places, pop_table = pop_table, shapefile = shapefile, 
                            pums_h = pums_h, pums_p = pums_p, schools = schools, 
                            workplaces = workplaces, marginals = marginals, output_type = output_type, 
                            output_dir = output_dir, convert_count = convert_count, 
                            sampling_method = sampling_method, locations_method = locations_method, 
                            outfile_loc = outfile_loc, export_objects = export_objects, 
                            road_noise = road_noise, timer = timer, verbose = verbose)

  } else if (run_type == "MPI") {
    region_list <- spew_mpi(num_places = num_places, pop_table = pop_table, shapefile = shapefile, 
                            pums_h = pums_h, pums_p = pums_p, schools = schools, 
                            workplaces = workplaces, marginals = marginals, output_type = output_type, 
                            output_dir = output_dir, convert_count = convert_count, 
                            sampling_method = sampling_method, locations_method = locations_method, 
                            outfile_loc = outfile_loc, export_objects = export_objects, 
                            road_noise = road_noise, timer = timer, verbose = verbose)
    
  } else if (run_type == "SOCK") {
    region_list <- spew_sock(num_places = num_places, pop_table = pop_table, shapefile = shapefile, 
                             pums_h = pums_h, pums_p = pums_p, schools = schools, 
                             workplaces = workplaces, marginals = marginals, output_type = output_type, 
                             output_dir = output_dir, convert_count = convert_count, 
                             sampling_method = sampling_method, locations_method = locations_method, 
                             outfile_loc = outfile_loc, export_objects = export_objects, 
                             road_noise = road_noise, timer = timer, verbose = verbose)
    
  } else if (run_type == "MC") {
    region_list <- spew_mc(num_places = num_places, pop_table = pop_table, shapefile = shapefile, 
                           pums_h = pums_h, pums_p = pums_p, schools = schools, 
                           workplaces = workplaces, marginals = marginals, output_type = output_type, 
                           output_dir = output_dir, convert_count = convert_count, 
                           sampling_method = sampling_method, locations_method = locations_method, 
                           outfile_loc = outfile_loc, export_objects = export_objects, 
                           road_noise = road_noise, timer = timer, verbose = verbose)
    
  } else {
    stop("run_type must be SEQ, MPI, SOCK, or MC")
  }

  # Print the diagnostics and summaries of the entire place
  if (verbose == TRUE) { print_region_list(region_list) }

  if (timer == TRUE) {
    location_time <- difftime(Sys.time(), location_start_time, units = "secs")
    location_time <- round(location_time, digits = 2)  
    location_time_statement <- paste0("Location runs in: ", location_time)
    print(location_time_statement)
  }  
  
  return(region_list)
}

#' Run SPEW Sequentially 
#' 
#' Internal function, only called by the main spew function
#' 
#' @param num_places The number of regions (place_ids) to be generated for a given location 
#' @param pop_table dataframe where rows correspond to places where populations 
#' should be generated. Other requird columns are "n_house" and "puma_id"
#' @param shapefile sp class object used for assigning households to 
#' particular locations  
#' @param pums_h dataframe with microdata corresponding to housegolds 
#' @param pums_p dataframe with microdata corresponding to people
#' @param schools list with names "public" and "private" with a 
#' dataframe of schools corresponding to public or private, respectively
#' @param workplaces dataframe of workplaces with a workplace_id column, 
#' employees column, and stcotr column
#' @param marginals list of marginal population totals. Each element of the 
#' list contains the marginal totals of a separate variable 
#' @param output_type Default is "console" if we want to resulting population 
#' as an R variable. Alternative is "write", which is used on Olympus for writing 
#' out .csv files of the population
#' @param output_dir directory to write output if output_type = "write", NULL otherwise 
#' @param convert_count logical meant to indicate if we are going to convert 
#' population totals from people to household counts. Default: FALSE, assumes
#' the population is the total number of households 
#' @param sampling_method character indicating the type of sampling 
#' method to use, defaults to "uniform". Can also be "ipf" with appropriate marginal data. 
#' @param locations_method character indicating the type of location 
#' sampling to use, defaults to "uniform", can also be "roads". 
#' @param outfile_loc Defaults to "", so we print out the parallel run information. 
#' Only set to "/dev/null" for internal testing putposes.
#' @param export_objects objects for exporting to parallel backends 
#' @param road_noise Noise added to households during road-based sampling 
#' @param timer logical indicating we want to time the run 
#' @param verbose logical indicating we want to print output during the run.  Default is FALSE.#' 
#' 
#' @return region_list with output_type for all num_place locations 
spew_seq <- function(num_places, pop_table, shapefile, pums_h, pums_p, 
                     schools, workplaces, marginals, output_type, output_dir, 
                     convert_count, sampling_method, locations_method, outfile_loc, 
                     export_objects, road_noise, timer, verbose) {
  
  region_list <- vector(mode = "list", length = num_places)
  for (place in 1:num_places) {
    if (verbose == TRUE) { print(paste0("Region ", place, " out of ", num_places)) }
    region_list[[place]] <- spew_place(index = place, 
                                       pop_table = pop_table, 
                                       shapefile = shapefile, 
                                       pums_h = pums_h, 
                                       pums_p = pums_p, 
                                       schools = schools, 
                                       workplaces = workplaces, 
                                       marginals = marginals, 
                                       output_type = output_type,
                                       sampling_method = sampling_method, 
                                       locations_method = locations_method, 
                                       convert_count = convert_count, 
                                       output_dir = output_dir,
                                       road_noise = road_noise, 
                                       timer = timer, 
                                       verbose = verbose)
  }
  
  return(region_list)
}


#' Run SPEW in Parallel with a SOCK backend 
#' 
#' Internal function, only called by the main spew function
#' 
#' @param num_places The number of regions (place_ids) to be generated for a given location 
#' @param pop_table dataframe where rows correspond to places where populations 
#' should be generated. Other requird columns are "n_house" and "puma_id"
#' @param shapefile sp class object used for assigning households to 
#' particular locations  
#' @param pums_h dataframe with microdata corresponding to housegolds 
#' @param pums_p dataframe with microdata corresponding to people
#' @param schools list with names "public" and "private" with a 
#' dataframe of schools corresponding to public or private, respectively
#' @param workplaces dataframe of workplaces with a workplace_id column, 
#' employees column, and stcotr column
#' @param marginals list of marginal population totals. Each element of the 
#' list contains the marginal totals of a separate variable 
#' @param output_type Default is "console" if we want to resulting population 
#' as an R variable. Alternative is "write", which is used on Olympus for writing 
#' out .csv files of the population
#' @param output_dir directory to write output if output_type = "write", NULL otherwise 
#' @param convert_count logical meant to indicate if we are going to convert 
#' population totals from people to household counts. Default: FALSE, assumes
#' the population is the total number of households 
#' @param sampling_method character indicating the type of sampling 
#' method to use, defaults to "uniform". Can also be "ipf" with appropriate marginal data. 
#' @param locations_method character indicating the type of location 
#' sampling to use, defaults to "uniform", can also be "roads". 
#' @param outfile_loc Defaults to "", so we print out the parallel run information. 
#' Only set to "/dev/null" for internal testing putposes.
#' @param export_objects objects for exporting to parallel backends 
#' @param road_noise Noise added to households during road-based sampling 
#' @param timer logical indicating we want to time the run 
#' @param verbose logical indicating we want to print output during the run.  Default is FALSE.#' 
#' 
#' @return region_list with output_type for all num_place locations 
spew_sock <- function(num_places, pop_table, shapefile, pums_h, pums_p, 
                      schools, workplaces, marginals, output_type, output_dir, 
                      convert_count, sampling_method, locations_method, outfile_loc, 
                      export_objects, road_noise, timer, verbose) {
  
  # Set up a SOCK cluster 
  num_workers <- min(num_places, parallel::detectCores(), 64) 
  cluster <- parallel::makeCluster(num_workers, type = "SOCK", outfile = outfile_loc)
  doParallel::registerDoParallel(cluster)
  
  # Export objects to the SOCK cluster  
  parallel::clusterExport(cl = cluster, varlist = export_objects, envir = environment())  
  
  # Run for-each to generate each place on a separate core 
  `%dopar%` <- foreach::`%dopar%`
  place <-  NULL
  region_list <- foreach::foreach(place = 1:num_places, 
                         .packages = c("plyr", "methods", "sp", "rgeos", "data.table", "mipfp", "quadprog"),
                         .errorhandling = 'pass', 
                         .export = export_objects) %dopar% {
                           
                         print(paste0("Region ", place, " out of ", num_places))
                         times <- spew_place(index = get("place"), 
                                    pop_table = pop_table, 
                                    shapefile = shapefile, 
                                    pums_h = pums_h, 
                                    pums_p = pums_p, 
                                    schools = schools, 
                                    workplaces = workplaces, 
                                    marginals = marginals, 
                                    output_type = output_type,
                                    sampling_method = sampling_method, 
                                    locations_method = locations_method, 
                                    convert_count = convert_count, 
                                    output_dir = output_dir,
                                    road_noise = road_noise, 
                                    timer = timer, 
                                    verbose = verbose)
                                                      
                           print(paste0("Finished ", pop_table[place, "place_id"]))
                           times
                         }
  
  parallel::stopCluster(cluster)
  return(region_list)
}

#' Run SPEW in Parallel with a Multicore backend 
#' 
#' Internal function, only called by the main spew function
#' 
#' @param num_places The number of regions (place_ids) to be generated for a given location 
#' @param pop_table dataframe where rows correspond to places where populations 
#' should be generated. Other requird columns are "n_house" and "puma_id"
#' @param shapefile sp class object used for assigning households to 
#' particular locations  
#' @param pums_h dataframe with microdata corresponding to housegolds 
#' @param pums_p dataframe with microdata corresponding to people
#' @param schools list with names "public" and "private" with a 
#' dataframe of schools corresponding to public or private, respectively
#' @param workplaces dataframe of workplaces with a workplace_id column, 
#' employees column, and stcotr column
#' @param marginals list of marginal population totals. Each element of the 
#' list contains the marginal totals of a separate variable 
#' @param output_type Default is "console" if we want to resulting population 
#' as an R variable. Alternative is "write", which is used on Olympus for writing 
#' out .csv files of the population
#' @param output_dir directory to write output if output_type = "write", NULL otherwise 
#' @param convert_count logical meant to indicate if we are going to convert 
#' population totals from people to household counts. Default: FALSE, assumes
#' the population is the total number of households 
#' @param sampling_method character indicating the type of sampling 
#' method to use, defaults to "uniform". Can also be "ipf" with appropriate marginal data. 
#' @param locations_method character indicating the type of location 
#' sampling to use, defaults to "uniform", can also be "roads". 
#' @param outfile_loc Defaults to "", so we print out the parallel run information. 
#' Only set to "/dev/null" for internal testing putposes.
#' @param export_objects objects for exporting to parallel backends 
#' @param road_noise Noise added to households during road-based sampling 
#' @param timer logical indicating we want to time the run 
#' @param verbose logical indicating we want to print output during the run.  Default is FALSE.#' 
#' 
#' @return region_list with output_type for all num_place locations 
spew_mc <- function(num_places, pop_table, shapefile, pums_h, pums_p, 
                    schools, workplaces, marginals, output_type, output_dir, 
                    convert_count, sampling_method, locations_method, outfile_loc, 
                    export_objects, road_noise, timer, verbose) {
  
  num_workers <- parallel::detectCores()
  region_list <- parallel::mclapply(X = 1:num_places, 
                                    FUN = spew_place, 
                                    pop_table = pop_table, 
                                    shapefile = shapefile, 
                                    pums_h = pums_h, 
                                    pums_p = pums_p, 
                                    schools = schools, 
                                    workplaces = workplaces, 
                                    marginals = marginals, 
                                    output_type = output_type,
                                    sampling_method = sampling_method, 
                                    locations_method = locations_method, 
                                    convert_count = convert_count, 
                                    output_dir = output_dir,
                                    road_noise = road_noise, 
                                    timer = timer, 
                                    verbose = verbose)
  
  return(region_list)
}

#' Run SPEW in Parallel with an MPI backend 
#' 
#' Internal function, only called by the main spew function
#' 
#' @param num_places The number of regions (place_ids) to be generated for a given location 
#' @param pop_table dataframe where rows correspond to places where populations 
#' should be generated. Other requird columns are "n_house" and "puma_id"
#' @param shapefile sp class object used for assigning households to 
#' particular locations  
#' @param pums_h dataframe with microdata corresponding to housegolds 
#' @param pums_p dataframe with microdata corresponding to people
#' @param schools list with names "public" and "private" with a 
#' dataframe of schools corresponding to public or private, respectively
#' @param workplaces dataframe of workplaces with a workplace_id column, 
#' employees column, and stcotr column
#' @param marginals list of marginal population totals. Each element of the 
#' list contains the marginal totals of a separate variable 
#' @param output_type Default is "console" if we want to resulting population 
#' as an R variable. Alternative is "write", which is used on Olympus for writing 
#' out .csv files of the population
#' @param output_dir directory to write output if output_type = "write", NULL otherwise 
#' @param convert_count logical meant to indicate if we are going to convert 
#' population totals from people to household counts. Default: FALSE, assumes
#' the population is the total number of households 
#' @param sampling_method character indicating the type of sampling 
#' method to use, defaults to "uniform". Can also be "ipf" with appropriate marginal data. 
#' @param locations_method character indicating the type of location 
#' sampling to use, defaults to "uniform", can also be "roads". 
#' @param outfile_loc Defaults to "", so we print out the parallel run information. 
#' Only set to "/dev/null" for internal testing putposes.
#' @param export_objects objects for exporting to parallel backends 
#' @param road_noise Noise added to households during road-based sampling 
#' @param timer logical indicating we want to time the run 
#' @param verbose logical indicating we want to print output during the run.  Default is FALSE.#' 
#' 
#' @export
#' 
#' @return region_list with output_type for all num_place locations 
spew_mpi <- function(num_places, pop_table, shapefile, pums_h, pums_p, 
                     schools, workplaces, marginals, output_type, output_dir, 
                     convert_count, sampling_method, locations_method, outfile_loc, 
                     export_objects, road_noise, timer, verbose) {
  # Set up the MPI workers
  num_workers <- Rmpi::mpi.universe.size()
  Rmpi::mpi.spawn.Rslaves(nslaves = num_workers)
  
  # Print out the processor information
  rk <- Rmpi::mpi.comm.rank(0)
  sz <- Rmpi::mpi.comm.size(0)
  name <- Rmpi::mpi.get.processor.name()
  if (verbose) { cat("Hello, rank " , rk , " out of " , sz , " on " , name, "\n") }

  # If we are on Olympus, make sure to switch to personal libraries
  # because the core olympus directories don't have our updated functions. 
  hostname <- system("hostname", intern = TRUE)
  print(paste0("Hostname: ", hostname))
  if (length(grep("olympus", hostname)) == 1) {
    print("On Olympus, switching lib-paths")
    username <- system("whoami", intern = TRUE)
    print(paste0("Username: ", username))
    print(.libPaths())
    Rmpi::mpi.bcast.cmd(username <- system("whoami", intern = TRUE))    
    Rmpi::mpi.bcast.cmd(personal_lib <- grep(username, .libPaths()))
    Rmpi::mpi.bcast.cmd(print(.libPaths()[personal_lib]))
    Rmpi::mpi.bcast.cmd(print(personal_lib))
    Rmpi::mpi.bcast.cmd(.libPaths(new = c(.libPaths()[personal_lib])))
    Rmpi::mpi.bcast.cmd(print("New Lib-Paths"))
    Rmpi::mpi.bcast.cmd(print(.libPaths()))
  }
  
  # Send the relevant data objects/packes to workers 
  Rmpi::mpi.bcast.cmd("requireNamespace(plyr)")
  Rmpi::mpi.bcast.cmd("requireNamespace(methods)")
  Rmpi::mpi.bcast.cmd("requireNamespace(sp)")
  Rmpi::mpi.bcast.cmd("requireNamespace(rgeos)")
  Rmpi::mpi.bcast.cmd("requireNamespace(data.table)")
  Rmpi::mpi.bcast.cmd("requireNamespace(mipfp)")
  Rmpi::mpi.bcast.cmd("requireNamespace(quadprog)")
  
  # Export functions to all of the workers ------
  for (obj in export_objects) {
    if (verbose) { cat("Sending To Workers: ", obj, fill = TRUE) }
    do.call("mpi.bcast.Robj2slave" , list (obj = as.name(obj)))
  }
  
  # Call SPEW for each one of the places 
  region_list <- Rmpi::mpi.applyLB(X = 1:num_places, 
                             FUN = spew_place, 
                             pop_table = pop_table, 
                             shapefile = shapefile, 
                             pums_h = pums_h, 
                             pums_p = pums_p, 
                             schools = schools, 
                             workplaces = workplaces, 
                             marginals = marginals, 
                             output_type = output_type,
                             sampling_method = sampling_method, 
                             locations_method = locations_method, 
                             convert_count = convert_count, 
                             output_dir = output_dir,
                             road_noise = road_noise, 
                             timer = timer, 
                             verbose = verbose)
  
  # Close the connections and MPI
  Rmpi::mpi.close.Rslaves()
  Rmpi::mpi.exit()
  
  return(region_list)
}

#' Generate synthetic ecosystem for single place 
#' 
#' Used by 'spew' to split the generated of synthetic ecosytstems 
#' into independent regions. 
#' 
#' @param index specfic row of pop-table to generate 
#' @param pop_table dataframe where rows correspond to places where populations 
#' should be generated. Other requird columns are "n_house" and "puma_id"
#' @param shapefile sp class object used for assigning households to 
#' particular locations  
#' @param pums_h dataframe with microdata corresponding to housegolds 
#' @param pums_p dataframe with microdata corresponding to people
#' @param schools list with names "public" and "private" with a 
#' dataframe of schools corresponding to public or private, respectively
#' @param workplaces dataframe of workplaces with a workplace_id column, 
#' employees column, and stcotr column
#' @param marginals list of marginal population totals. Each element of the 
#' list contains the marginal totals of a separate variable 
#' @param output_type Default is "console" if we want to resulting population 
#' as an R variable. Alternative is "write", which is used on Olympus for writing 
#' out .csv files of the population
#' @param sampling_method character indicating the type of sampling 
#' method to use, defaults to "uniform". Can also be "ipf" with appropriate marginal data. 
#' @param locations_method character indicating the type of location 
#' sampling to use, defaults to "uniform", can also be "roads". 
#' @param convert_count logical meant to indicate if we are going to convert 
#' population totals from people to household counts. Default: FALSE, assumes
#' the population is the total number of households 
#' @param output_dir directory to write output if output_type = "write", NULL otherwise 
#' @param road_noise Noise added to households during road-based sampling 
#' @param timer logical indicating we want to time the run 
#' @param verbose logical indicating we want to print output during the run.  Default is FALSE.#' 
#' 
#' @export
#' 
#' @return List of a synthetic ecosystem 
#' 
#' @examples 
#' data(tartanville)
#' tartanville_T1 <- spew_place(index = 1, 
#'                              pop_table = tartanville$pop_table, 
#'                              shapefile = tartanville$shapefile, 
#'                              pums_h = tartanville$pums_h, 
#'                              pums_p = tartanville$pums_p)
#'                              
spew_place <- function(index, pop_table, shapefile, pums_h, pums_p, 
                       schools = NULL, workplaces = NULL, marginals = NULL, 
                       output_type = "console", sampling_method = "uniform", 
                       locations_method = "uniform", convert_count = FALSE, 
                       output_dir = NULL, road_noise = .0002, 
                       timer = FALSE, verbose = FALSE) {
  if (timer == TRUE) { place_start_time <- Sys.time() }
  
  # Extract region-specific information such as household size, IDs, etc...
  n_house <- pop_table[index, "n_house"]
  if (n_house == 0) { print("Place has 0 Households!"); return(TRUE) } 
  n_house <- ccount(convert_count, pums_h$PERSONS, nrow(pums_p), nrow(pums_h), n_house)
  puma_id <- pop_table[index, "puma_id"]
  place_id <- pop_table[index, "place_id"]
  shapefile_id <- switch("shapefile_id" %in% names(pop_table), NULL, pop_table[index, "shapefile_id"])
  
  # Sample Households ---
  sampled_households <- sample_households(method = sampling_method, 
                                          n_house = n_house, 
                                          pums_h = pums_h, 
                                          pums_p = pums_p, 
                                          marginals = marginals,
                                          puma_id = puma_id, 
                                          place_id = place_id)
  
  # Sample Household Locations ---
  locations <- sample_locations(method = locations_method, 
                                place_id = place_id,
                                n_house = n_house, 
                                shapefile = shapefile, 
                                noise = road_noise, 
                                shapefile_id = shapefile_id)
  sampled_households$longitude <- locations@coords[, 1]
  sampled_households$latitude <- locations@coords[, 2]

  # Sample People --- 
  sampled_people <- sample_people(method = sampling_method, 
                                  household_pums = sampled_households, 
                                  pums_p = pums_p, 
                                  puma_id = puma_id, 
                                  place_id = place_id)

  # Schools --- 
  school_time <- 0
  if (!is.null(schools)) {
    school_start_time <- Sys.time()
    
    school_ids <- assign_schools(sampled_people, schools)
    sampled_people$school_id <- school_ids
    stopifnot("school_id" %in% names(sampled_people))
    
    school_time <- difftime(Sys.time(), school_start_time, units = "secs")
    school_time <- round(school_time, digits = 2)
  }
  
  # Workplaces ---
  workplace_time <- 0
  if (!is.null(workplaces)) {
    workplace_start_time <- Sys.time()
    
    workplace_ids <- assign_workplaces(sampled_people, workplaces)
    sampled_people$workplace_id <- workplace_ids
    stopifnot("workplace_id" %in% names(sampled_people))
    
    workplace_time <- difftime(Sys.time(), workplace_start_time, units = "secs")
    workplace_time <- round(school_time, digits = 2)
  }
  
  # Activities ---

  # Vectors ---
  
  # Return synthetic ecosystem if output_type equals 'console'
  if (output_type == "console") {
    return(list(households = sampled_households,
                people = sampled_people, 
                place_id = place_id,
                puma_id = puma_id))
  }
  
  stopifnot(output_type == "write")
  
  # Write the synthetic populations as CSV's
  write_data(df = sampled_households, place_id = place_id, 
             puma_id = puma_id, type = "household", 
             output_dir = output_dir)          
  write_data(df = sampled_people, place_id = place_id, 
             puma_id = puma_id, type = "people", 
             output_dir = output_dir)    

  # Collect diagnostic and summary information on this particular   
  # job and return this to the make_data function for analysis 
  place_time <- difftime(Sys.time(), place_start_time, units = "secs")
  place_time <- as.numeric(round(place_time, digits = 2))
  place_time_statement <- paste0("Time: ", place_time)
  
  hh_statement <- paste0("Households: ", nrow(sampled_households))
  people_statement <- paste0("People: ", nrow(sampled_people))  
  school_statement <- paste0("Schools: ", school_time)
  workplace_statement <- paste0("Workplaces: ", workplace_time)  
  place_statement <- paste0("Place: ", index) 
  total_place_statement <- paste0("Total Places: ", nrow(pop_table))
  place_name_statement <- paste0("Place Name: ", place_id)
  puma_statement <- paste0("Puma: ", puma_id)

  return(list(place_name = place_name_statement,
              place_num = place_statement, 
              total_places = total_place_statement, 
              total_households = hh_statement, 
              total_people = people_statement, 
              place_time = place_time_statement, 
              schools = school_statement, 
              workplaces = workplace_statement, 
              puma_name = puma_statement))
}

#' Adjust number of households 
#' 
#' Only changes things when population total represents the number 
#' of people (instead of households) 
#' 
#' @param convert_count logical, TRUE means make the switch 
#' @param persons vector of number of persons in each household 
#' @param np rows of person pums 
#' @param nh rows of household pums 
#' @param n_house original count of number of households 
#' 
#' @return updated n_house 
ccount <- function(convert_count = FALSE, persons, np, nh, n_house) {
  if (convert_count == TRUE) {
    if (is.null(persons)) { persons <- np / nh }
    n_house <- people_to_households(persons, n_house)
  }
  
  return(n_house)  
}

#' Convert a population count to household count 
#' 
#' @param hh_sizes numeric vector with the household 
#' sizes for this particular sampling region  
#' @param n_people numeric indicating the number 
#' of people in this specific region 
#' 
#' @return number of households 
people_to_households <- function(hh_sizes, n_people) {
  hh_avg <- mean(hh_sizes)
  num_households <- floor(n_people / hh_avg)
  return(num_households)
}

#' Output our final synthetic populations as csv's 
#' 
#' @param df dataframe with the final synthetic population 
#' @param place_id numeric indicating the name of the particular region samples
#' @param puma_id numeric indicating the puma this synthetic population belongs to 
#' @param type character vector with the type, either "household" or "people"
#' @param output_dir character containing the directory in which we want to 
#' write the final csv's 
#' @param append logical determining if we want to append to the current file 
#' 
#' @return data indicating the indices of people to sample 
write_data <- function(df, place_id, puma_id, type, output_dir, append = FALSE) {
  # Create the output directory for this PUMA 
  directory <- file.path(output_dir, paste0("output_", puma_id), "eco")  
  if (!dir.exists(directory)) {
    dir.create(directory, recursive = TRUE)          
  }
  
  filename <- file.path(directory, paste0(type, "_", as.character(place_id), ".csv"))
  filename <- remove_excess(filename)
  
  if (!requireNamespace("data.table", quietly = TRUE)) {
    write.csv(df, filename, append = append)
  } else {
    data.table::fwrite(df, filename, sep = ",", qmethod = "double", append = append)
  }
  
  return(TRUE)
}

#' Write out the final, formatted table 
#' 
#' @param pop_table the population table df 
#' @param output_dir character vector of the directory 
#' to write the population table 
#' 
#' @return logical TRUE if completed. As well as a written 
#' pop_table to the given output directory 
write_pop_table <- function(pop_table, output_dir) {
  pop_table$place_id <- remove_excess(pop_table$place_id)
  filename <- file.path(output_dir, "final_pop_table.csv")
  filename <- remove_excess(filename)
  write.csv(pop_table, filename)
  
  return(TRUE)
}

#' Write school environment 
#' 
#' @param schools school data-object 
#' @param env_dir filepath to write out the environment 
#' 
#' @return logical TRUE if completed successfully
write_schools <- function(schools, env_dir) {
  # Write out the public, then private school CSV's!
  public_df <- schools[["public"]]
  public_df$School <- remove_excess(name = public_df$School)
  public_filename <- file.path(env_dir, "public_schools.csv")  
  write.csv(public_df, public_filename)
  
  private_df <- schools[["private"]]
  private_df$School <- remove_excess(name = private_df$School)
  private_filename <- file.path(env_dir, "private_schools.csv")  
  write.csv(private_df, private_filename)
  
  return(TRUE)
}

#' Write workplaces environment 
#' 
#' @param workplaces school data-object 
#' @param env_dir filepath to write out the environment 
#' 
#' @return logical TRUE if completed successfully
write_workplaces <- function(workplaces, env_dir) {
  workplaces_filename <- file.path(env_dir, "workplaces.csv")  
  write.csv(workplaces, workplaces_filename)
  
  return(TRUE)
}

#' Write out information on each region 
#' 
#' @param region_list a list containing all of the 
#' summary and diagnostic information for each  
#'  
print_region_list <- function(region_list) {
  # Loop through each element of a list and 
  # print our each element within that list 
  for (region in seq_along(region_list)) {
    for (diag in seq_along(region_list[[region]])) {
      diag_statement <- region_list[[region]][[diag]]
      print(diag_statement)
    }
  }
  
  return(TRUE)
}

Try the spew package in your browser

Any scripts or data that you put into this service are public.

spew documentation built on Nov. 17, 2017, 7:36 a.m.