#' duplicateItems
#'
#' @details
#' Function that duplicates a large data.table, adding a "index" column to all rows in the output indicating which
#' instance of the duplication the row is associated with
#'
#' requires a column called "_reps" on the object to determine how many times it is to be duplicated
#'
#' @param dt data.table
#' @param split_attribute name of attribute to split the items between worker tasks
#' @param indexStart starting number for the "index" value added to the item
#' @noRd
#'
duplicateItems <- function( dt, split_attribute, ncores=1, indexStart=1 )
{
#add an additional column before splitting on it - so that the value we're really splitting on still appears in the output.
dt[, `_TEMP_` := get(split_attribute) ]
dt_split <- dt[, .(list(.SD)), by = `_TEMP_`][,V1]
dt$`_TEMP_` <- NULL
duplicate_int <- function(dta) {
rep <- dta$`_reps`[1]
return ( duplicateItem( dta, rep, indexStart ) )
}
if (ncores == 1) {
#duplicates <- pbapply::pblapply(dt_split, duplicate_int)
duplicates <- purrr::map(dt_split, duplicate_int)
} else {
# cl <- parallel::makeCluster(ncores)
# parallel::clusterEvalQ(cl, {
# #put any setup required for all worker processes in here
# options( UK2GTFS_opt_updateCachedDataOnLibaryLoad = FALSE )
# loadNamespace("UK2GTFS")
# })
#
# duplicates <- pbapply::pblapply(dt_split,
# duplicate_int,
# cl = cl)
# parallel::stopCluster(cl)
# rm(cl)
future::plan(future::multisession, workers = ncores)
duplicates <- furrr::future_map(dt_split, duplicate_int)
future::plan(future::sequential)
}
duplicates <- data.table::rbindlist(duplicates, use.names=FALSE)
duplicates$`_reps` <- NULL #performance, putting this inside duplicate_int roughly doubles the execution time
return (duplicates)
}
#' Duplicate stop_times
#'
#' @details
#' Function that duplicates stop times for trips that have been split into
#' multiple trips and sets the new trip id on the duplicated stop_times
#'
#' @param calendar calendar data.frame
#' @param stop_times stop_times data.frame
#' @param ncores number of processes for parallel processing (default = 1)
#' @noRd
#'
duplicate_stop_times <- function(calendar, stop_times, ncores = 1) {
outputColumnNames = c(
"trip_id", "arrival_time", "departure_time", "stop_id", "stop_sequence",
"pickup_type", "drop_off_type", "schedule"
)
#it's pretty marginal doing this on multiple threads. With a typical sized day all GB file,
#doing the split takes 2.4s and the duplication 7.8s (on one thread)
#TODO look at avoiding the split if threads=1
return ( duplicate_related_items( calendar, stop_times,
original_join_field = "schedule",
new_join_field = "trip_id",
outputColumnNames = outputColumnNames,
ncores=ncores ) )
}
#' Duplicate related items
#'
#' @details
#' Function that duplicates items that are related to calendar
#' expected input are calendar items have been duplicated but retain the same (now duplicate) 'rowID'
#' this tells us which objects to duplicate and how many are required
#' the related_items have an attribute <original_join_field>, which joins back to 'rowID' on the calendar items
#'
#' After duplication the duplicated items are joined back onto the input calendar items
#' to create an additional attribute on the output objects
#'
#' The calendar item attribute <new_join_field> forms the new relation between the calendar items and
#' related items, so must be unique.
#'
#' @param calendar calendar data.frame
#' @param related_items data.frame of items to be replicated
#' @param ncores number of processes for parallel processing (default = 1) (currently hangs/crashes if >1)
#' @noRd
#'
duplicate_related_items <- function(calendar, related_items, original_join_field, new_join_field, outputColumnNames, ncores = 1) {
calendar.dup <- calendar[duplicated(calendar$rowID), ]
if( nrow(calendar.dup) <= 0 )
{
#no duplicating to do
warning("duplicate_related_items: there were no duplicates detected. In real data this may indicate there has been an error earlier in the processing.")
related_items_dup = data.table::data.table()
}
else
{
#create a count of the number of each duplicate of rowID
rowID.unique <- as.data.frame(table(calendar.dup$rowID))
rowID.unique$Var1 <- as.integer(as.character(rowID.unique$Var1))
#join the count of number of duplicates required to the stop times (so we can retrieve it later when doing the duplication)
related_items <- dplyr::left_join(related_items, rowID.unique,
by = stats::setNames("Var1",original_join_field) )
#set the number of duplications required
related_items$`_reps` <- related_items$Freq
# TODO: The could handle cases of non duplicated stoptimes within duplicate.stop_times.int
# rather than splitting and rejoining, would bring code tidyness and speed improvements
related_items_dup <- duplicateItems( related_items, original_join_field, ncores=ncores, indexStart=1 )
# join via rowID+index to get new de-duplicated trip_id
#create index on the table we want to join to - group by the rowId, index runs from 0..count()-1 of group size
#we start at zero so we don't effect the original stop_times rows and just join in the duplicated rows
new_join_ids <- dplyr::group_by(calendar, rowID)
new_join_ids <- dplyr::mutate(new_join_ids, Index = seq(0, dplyr::n()-1))
new_join_ids <- as.data.frame( new_join_ids[, c("rowID", new_join_field, "Index")] )
related_items_dup <- dplyr::left_join(related_items_dup, new_join_ids,
by = stats::setNames(c("rowID","Index"),c(original_join_field,"index")) )
#select columns required
related_items_dup <- related_items_dup[, outputColumnNames, with=FALSE]
}
calendar.nodup <- calendar[!duplicated(calendar$rowID), ]
# when routes are cancelled their stop times are left without valid trip_ids - remove those rows
# this only applies to the non-duplicated rows
# Join via rowID to determine the trip_id
related_ids_nodup <- calendar.nodup[, c("rowID", new_join_field), with=FALSE]
related_items_no_dup <- dplyr::left_join(related_items, related_ids_nodup,
by = stats::setNames("rowID",original_join_field))
related_items_no_dup <- related_items_no_dup[!is.na(related_items_no_dup[[new_join_field]]), ]
#select columns required, join output results together
related_items_no_dup <- related_items_no_dup[, outputColumnNames, with=FALSE]
related_items_comb <- data.table::rbindlist(list(related_items_no_dup, related_items_dup), use.names=FALSE)
return(related_items_comb)
}
fixStopTimeData <- function(stop_times)
{
# Fix arrival_time / departure_time being 0000 for pick up only or drop off only trains
stop_times$departure_time <- dplyr::if_else(stop_times$departure_time == "000000" & stop_times$Activity == "D",
stop_times$arrival_time,
stop_times$departure_time)
stop_times$arrival_time <- dplyr::if_else(stop_times$arrival_time == "000000" & stop_times$Activity == "U",
stop_times$departure_time,
stop_times$arrival_time)
#fix missing arrival / departure times by copying from the other time.
stop_times$arrival_time[is.na(stop_times$arrival_time)] <- stop_times$departure_time[is.na(stop_times$arrival_time)]
stop_times$departure_time[is.na(stop_times$departure_time)] <- stop_times$arrival_time[is.na(stop_times$departure_time)]
return (stop_times)
}
#' Strip White Space
#'
#' @details
#' Input data.table is modified in-place and returned to the caller.
#'
#' Strips trailing whitespace from all char columns in a data.table
#' empty values are converted to NA
#' returns the data.table
#'
#' @param dt data table or data frame (for legacy mode)
#' @noRd
#'
strip_whitespace <- function(dt) {
if(inherits(dt,"data.table")){
char_cols <- sapply(dt, is.character)
char_col_names <- names(char_cols[char_cols])
for (col_name in char_col_names) {
data.table::set(dt, j = col_name, value = trimws(dt[[col_name]], which = "right"))
dt[dt[[col_name]] == "", (col_name) := NA_character_]
}
} else {
sws <- function(val) {
val <- trimws(val, which = "right")
val[val == ""] <- NA
return(val)
}
dt[] <- lapply(dt, sws)
}
return (dt)
}
process_times <- function(dt, working_timetable) {
dt = processOneTime(dt, working_timetable, "Arrival Time", "Scheduled Arrival Time", "Public Arrival Time")
dt = processOneTime(dt, working_timetable, "Departure Time", "Scheduled Departure Time", "Public Departure Time")
return(dt)
}
#does in place-modification of input data.table
#select the public arrive/depart times if they exist, otherwise select the wtt arrive/depart times if they exist, otherwise select the pass time
#and at the same time fill in the missing seconds values (and 30 seconds if 'H' is indicated)
processOneTime <- function(dt, working_timetable, targetField, sourceFieldWtt, sourceField)
{
hasPass = "Scheduled Pass" %in% colnames(dt)
if (sourceFieldWtt %in% colnames(dt))
{
if (working_timetable)
{
if(hasPass)
{
data.table::set(dt, j = targetField, value = gsub("^(\\d{4}) $","\\100",gsub("^(\\d{4})H$", "\\130",
data.table::fifelse( " "==dt[[sourceFieldWtt]],
dt[["Scheduled Pass"]],
dt[[sourceFieldWtt]])))
)
}
else
{
data.table::set(dt, j = targetField, value = gsub("^(\\d{4}) $","\\100",gsub("^(\\d{4})H$", "\\130", dt[[sourceFieldWtt]])))
}
}
else
{
if(hasPass)
{
data.table::set(dt, j = targetField, value = data.table::fifelse( "0000"==dt[[sourceField]],
gsub("^(\\d{4}) $","\\100",gsub("^(\\d{4})H$", "\\130",
data.table::fifelse( " "==dt[[sourceFieldWtt]],
dt[["Scheduled Pass"]],
dt[[sourceFieldWtt]]))),
gsub("^(\\d{4})$", "\\100", dt[[sourceField]]))
)
}
else
{
#If there is no Public Arrival time this field will default to 0000. (we will use WTT instead)
data.table::set(dt, j = targetField, value = data.table::fifelse( "0000"==dt[[sourceField]],
gsub("^(\\d{4}) $","\\100",gsub("^(\\d{4})H$", "\\130", dt[[sourceFieldWtt]])),
gsub("^(\\d{4})$", "\\100", dt[[sourceField]]))
)
}
}
}
return (dt)
}
# Process Activity Codes
process_activity <- function(dt, public_only) {
# if ( any( 12 != nchar(dt$Activity) ) )
# {
# stop("bad input data in process_activity(), all Activity fields should be 12 chars long")
# }
# don't really need this test since we're reading in from fixed width files
#performance, runs about twice as fast if we do processing outside data.table then insert it later
splitActivity = unlist( stringi::stri_extract_all_regex(dt$Activity, ".{2}") )
splitActivityMat = matrix(splitActivity, ncol=6, byrow=TRUE)
# Filter to stops for passengers
#see https://wiki.openraildata.com/index.php?title=Activity_codes for definitions
acts <- c(
"TB", # Train Starts
"T " , # Stops to take up and set down passengers
"D ", # Stops to set down passengers
"U ", # Stops to take up passengers
"R ", # Request stop
"TF" # Train Finishes
)
if(public_only)
{
allowed = (" "!=splitActivityMat) & (splitActivityMat %in% acts)
}
else
{
allowed = (" "!=splitActivityMat)
}
splitActivityMat[!allowed] <- ""
activity = sprintf("%s,%s,%s,%s,%s,%s", splitActivityMat[,1], splitActivityMat[,2], splitActivityMat[,3], splitActivityMat[,4], splitActivityMat[,5], splitActivityMat[,6] )
#replace multiple comma with single comma, remove whitespace, remove leading comma, remove trailing comma.
activity = gsub(",+", ",", activity)
data.table::set(dt, j="Activity", value = gsub("\\s+|^,|,$", "", activity))
#remove rows with no activity we're interested in (there is no activity at 'pass' locations)
if(public_only)
{
dt <- dt[ ""!=dt$Activity ]
}
return(dt)
}
getCachedAgencyData <- function(agency = "atoc_agency")
{
if(inherits(agency,"character"))
{
if(agency == "atoc_agency")
{
load_data("atoc_agency")
agency = atoc_agency
}
else #TODO test column names
{
checkmate::check_file_exists(agency)
agency <- utils::read.csv(agency, stringsAsFactors = FALSE)
}
if ( !inherits(agency, "data.frame") || 0==nrow(agency) ){ stop("failed to load atoc_agency data.") }
}
return (agency)
}
getCachedLocationData <- function(locations = "tiplocs")
{
if(inherits(locations,"character"))
{
if(locations == "tiplocs")
{
load_data("tiplocs")
locations = tiplocs
}
else
{
checkmate::check_file_exists(locations)
locations <- utils::read.csv(locations, stringsAsFactors = FALSE)
}
if ( !inherits(locations, "data.frame") || 0==nrow(locations) ){ stop("failed to tiploc data.") }
}
# Get the Station Locations
if (inherits(locations, "sf"))
{
stops <- cbind(locations, sf::st_coordinates(locations))
stops <- sf::st_drop_geometry(stops)
stops <- data.table::as.data.table(stops)
data.table::setnames(stops, old = c("Y", "X"), new = c("stop_lat", "stop_lon"))
}
else #TODO test column names
{
stops = locations
}
stops$stop_lat <- round(stops$stop_lat, 5)
stops$stop_lon <- round(stops$stop_lon, 5)
return (stops)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.