#' Create a tidied participant data.table
#'
#'
#' @param stage_profiles the output of `query_stage_profiles()`.
#' @param stage_uuids the output of `query_stage_uuids()`.
#'
#' @note
#'
#' the `update_ts` field can be used to infer the signup datetime of each user.
#'
#' @return a data.table
#' @export
tidy_participants <- function(stage_profiles, stage_uuids) {
merged <- merge(
x = stage_profiles,
y = stage_uuids[, c("update_ts") := NULL],
by = "user_id"
)
# print(names(merged))
max_cols_to_purge <- c("uuid", "source", "device_token", "mpg_array", "curr_sync_interval")
cols_to_purge <- intersect(names(merged), max_cols_to_purge)
# print(cols_to_purge)
merged %>%
.[, eval(cols_to_purge) := NULL] %>%
data.table::setcolorder(c("user_email", "user_id"))
}
#' Tidy the 'cleaned trips' data.frame into a sf object.
#'
#' @param cleaned_trips a data.table output from `query_cleaned_trips()`.
#' @param project_crs a EPSG code. Default as 4326.
#' @param smallest_rounding_digit an integer value.
#'
#' @return a spatial data.frame of class sf.
#' @export
tidy_cleaned_trips <- function(cleaned_trips, project_crs = 4326, smallest_rounding_digit = 2) {
message("Finished query, about to clean trips")
cleaned_trips_sf <-
# flatten out names and remove unnecessary columns
cleaned_trips %>%
dplyr::select(-dplyr::contains(
c(
".hour",
".second",
".minute",
".day",
".month",
".year",
".weekday"
)
)) %>%
setnames(gsub("data.", "", names(.))) %>%
setnames(gsub("user_input", "", names(.))) %>%
janitor::clean_names() %>%
dplyr::select(-metakey, -metaplatform) %>%
dplyr::mutate(
start_fmt_time0 = start_fmt_time,
end_fmt_time0 = end_fmt_time,
start_fmt_time = lubridate::as_datetime(start_fmt_time0), # converts to UTC
end_fmt_time = lubridate::as_datetime(end_fmt_time0), # converts to UTC
start_local_time = purrr::map2(start_fmt_time, start_local_dt_timezone, ~ format(.x, tz = .y, usetz = TRUE)), # a timestamp but has to be a string
end_local_time = purrr::map2(end_fmt_time, end_local_dt_timezone, ~ format(.x, tz = .y, usetz = TRUE)) # a timestamp but has to be a string
) %>%
tidyr::unnest(c("start_local_time", "end_local_time")) %>%
# convert to data.table for speed!
data.table::setDT(.) %>%
# create a geometry column but is not a sf object yet
.[,
geometry := list(list(st_linestring(matrix(
as.numeric(unlist(
c(start_loc_coordinates, end_loc_coordinates)
)),
ncol = 2,
byrow = T
)))),
by = 1:nrow(.)
] %>%
# add lat and lon columns for both start and end locations
dplyr::mutate(
end_lat_lon = gsub("c\\(|\\)| ", "", paste(end_loc_coordinates)),
start_lat_lon = gsub("c\\(|\\)| ", "", paste(start_loc_coordinates))
) %>%
tidyr::separate(
end_lat_lon,
into = c("end_lon", "end_lat"),
sep = ",",
convert = TRUE
) %>%
tidyr::separate(
start_lat_lon,
into = c("start_lon", "start_lat"),
sep = ",",
convert = TRUE
) %>%
# convert into a sf object, this allows us to use spatial mapping functions
# from packages like `ggplot2` and `mapview`.
# this requires the curley brackets because we are re-using the placeholder
# see: the section 'Re-using the placeholder for attributes' in
# https://magrittr.tidyverse.org/
{
sf::st_sf(dplyr::select(., -geometry),
geometry = .[["geometry"]],
crs = project_crs
)
} %>%
dplyr::select(-dplyr::starts_with(c("meta", "data_"))) %>%
dplyr::select(-dplyr::ends_with(
c(
"_ts",
"_ts_plus",
"_place",
"_loc_type",
"raw_trip",
"_lat",
"_lon"
# "_coordinates" # keep the start and end coordinates for later visualization
)
)) %>%
dplyr::select(
user_id,
start_fmt_time0,
start_local_dt_timezone,
start_fmt_time,
start_local_time,
end_fmt_time0,
end_local_dt_timezone,
end_fmt_time,
end_local_time,
dplyr::everything()
) %>%
# converting duration in seconds and distance in metres
dplyr::mutate(
duration_min = round(duration / 60, smallest_rounding_digit),
distance_mi = round(distance / 1609, smallest_rounding_digit),
distance_km = round(distance / 1000, smallest_rounding_digit)
)
message("Finished cleaning trips")
return(cleaned_trips_sf)
}
tidy_cleaned_trips_by_timestamp <- function(df) {
# Within trips, metadata and data are list columns with multiple fields.
# user_input is a list column found in data
# Make a data table out of user_id and meta data.
dt <- df[, c("user_id", "metadata")] %>% as.data.table()
# Make a dataframe containing just the 'data' list-column
data_df <- df[, c("data")]
data_df_without_user_input <- data_df[, !"user_input" == names(data_df)]
user_input_df <- df[["data"]][["user_input"]] %>% as.data.table()
# Combine the user_id/metadata data table with the 'data' dataframe.
dt <- cbind(dt, data_df_without_user_input)
# If there are no user inputs, only return dt without user input columns
if (nrow(user_input_df) == 0) {
message("There are no user inputs in the time range")
return(dt)
}
cbind(dt, user_input_df)
}
summarise_trips_without_trips <- function(participants, cons) {
confirmed_user_input_column <- getOption("emdash.confirmed_user_input_column")
trip_query <- query_trip_dates(cons, confirmed_user_input_column) %>%
as.data.table() %>%
normalise_uuid()
# Generate intermediate columns to use for summarizing trips.
helper_trip_cols <- trip_query %>%
.[, .(
user_id,
start_fmt_time = lubridate::as_datetime(trip_query$data.start_fmt_time), # convert trip start datetimes to UTC
end_fmt_time = lubridate::as_datetime(trip_query$data.end_fmt_time) # convert trip end datetimes to UTC
)] %>%
# Now add columns generated from start/end_fmt_time
.[, ":="
(
start_local_time = purrr::map2(start_fmt_time, trip_query$data.start_local_dt.timezone, ~ format(.x, tz = .y, usetz = TRUE)) %>%
as.character() %>% as.POSIXct(), # format returns a list, but lubridate::as_datetime needs it as a character string
end_local_time = purrr::map2(end_fmt_time, trip_query$data.end_local_dt.timezone, ~ format(.x, tz = .y, usetz = TRUE)) %>%
as.character() %>% as.POSIXct()
)]
# Write the trip summary columns
summ_trips <-
helper_trip_cols %>%
# adds the date of local datetime of trip
.[, date := lubridate::date(start_local_time)] %>%
# Generate summary columns
.[, .(
n_trips_today = sum(date == Sys.Date()),
n_active_days = data.table::uniqueN(date),
first_trip_datetime = min(start_fmt_time),
last_trip_datetime = max(start_fmt_time), # in UTC
first_trip_local_datetime = min(start_local_time),
last_trip_local_datetime = max(end_local_time)
), by = user_id] %>%
.[, n_days := round(as.numeric(difftime(last_trip_datetime, first_trip_datetime, units = "days")), 1)]
# Add the 'unconfirmed' column.
unconfirmed_summ <- trip_query %>%
.[is.na(trip_query[[confirmed_user_input_column]]), .(unconfirmed = .N), by = user_id]
# Count the number of trips per user
n_trips <- count_total_trips(cons)
summ_trips <- merge(n_trips, summ_trips, by = "user_id")
message("merging trip summaries with participants")
merge(participants, summ_trips, by = "user_id", all.x = TRUE) %>%
merge(., unconfirmed_summ, by = "user_id", all.x = TRUE) %>%
.[is.na(unconfirmed), unconfirmed := 0]
}
#' Create a summary of trips in data.table format.
#'
#' @param participants the output from `tidy_participants()`.
#' @param trips the output from `tidy_cleaned_trips()`.
#'
#' @return a data.table.
#' @export
summarise_trips <- function(participants, trips) {
summ_trips <-
trips %>%
sf::st_drop_geometry(.) %>%
data.table::setDT(.) %>%
.[, date := lubridate::date(lubridate::as_datetime(start_local_time))] %>%
# adds the date of local datetime of trip
.[, .(
n_trips = .N,
n_trips_today = sum(date == Sys.Date()), # compares date of local datetime of trip to date of local time of dashboard user
n_active_days = data.table::uniqueN(date),
first_trip_datetime = min(start_fmt_time), # in UTC
last_trip_datetime = max(start_fmt_time), # in UTC
first_trip_local_datetime = format(min(lubridate::as_datetime(start_local_time)), usetz = FALSE),
last_trip_local_datetime = format(max(lubridate::as_datetime(end_local_time)), usetz = FALSE)
), by = user_id] %>%
.[, n_days := round(as.numeric(difftime(last_trip_datetime, first_trip_datetime, units = "days")), 1)]
merge(participants, summ_trips, by = "user_id", all.x = TRUE)
}
#' Create a summary of server calls in data.table format.
#'
#' @param participants the output from `tidy_participants()`.
#' @param cons the connection to mongodb.
#'
#' @return a data.table.
#' @export
summarise_server_calls <- function(participants, cons) {
# Get all of the relevant server calls
usercache_get_summ <- query_usercache_get_summ(cons)
usercache_put_summ <- query_usercache_put_summ(cons)
diary_summ <- query_diary_summ(cons)
# Get the column names that are in participants before merging.
particpt_cols <- names(participants)
# Merge each nonempty summary table with participants
# Current columns to be added to participants: (first/last)_(get/put/diary)_call
message("merging server call summaries with participants")
merged <- FALSE
if (nrow(usercache_get_summ > 0)) {
participants <- merge(participants, usercache_get_summ, by = "user_id", all.x = TRUE)
merged <- TRUE
}
if (nrow(usercache_put_summ > 0)) {
participants <- merge(participants, usercache_put_summ, by = "user_id", all.x = TRUE)
merged <- TRUE
}
if (nrow(diary_summ) > 0) {
participants <- merge(participants, diary_summ, by = "user_id", all.x = TRUE)
merged <- TRUE
}
message(ifelse(merged, "Finished merging server calls", "No server calls to merge"))
all_cols <- names(participants)
subset_cols <- all_cols[!all_cols %in% particpt_cols] # get the columns not in the original participants columns. a[!a %in% b] = "a without b"
# Convert all server call time stamps to datetime by applying as_datetime to each server call column
return(
participants[, (subset_cols) := lapply(.SD, function(x) lubridate::as_datetime(x)), .SDcols = subset_cols]
)
}
#' Tidy the 'cleaned locations' data.frame into a tibble.
#'
#' @param cleaned_locations a data.table output from `query_cleaned_trips()`.
#'
#' @return a tibble.
#' @export
tidy_cleaned_locations <- function(cleaned_locations) {
message("Finished query, about to clean locations")
tidied_locations <-
# flatten out names and select specific columns
cleaned_locations %>%
setnames(gsub("data.", "", names(.))) %>%
janitor::clean_names() %>%
dplyr::select(user_id, loc_type, longitude, latitude, loc_coordinates, fmt_time, idx, mode, section) %>%
# convert datetime
dplyr::mutate(
fmt_time_utc = lubridate::as_datetime(fmt_time)
)
message("Finished cleaning locations")
return(tidied_locations)
}
#' Tidy the 'server calls' data.frame into a tibble.
#'
#' @param server_calls a data.table output from `query_server_calls()`.
#'
#' @return a tibble.
#' @export
tidy_server_calls <- function(server_calls) {
message("Finished query, about to tidy server calls")
tidied_server_calls <-
# flatten out names and select specific columns
server_calls %>%
setnames(gsub("data.", "", names(.))) %>%
janitor::clean_names() %>%
dplyr::select(user_id, name, ts, reading) %>%
# convert datetime
dplyr::mutate(
fmt_time_utc = lubridate::as_datetime(ts)
)
message("Finished tidying server calls")
return(tidied_server_calls)
}
#' Tidy together the 'tidied trips' and 'tidied locations' to generate trajectories within a trip
#'
#' @param tidied_trips a data.frame from `tidy_cleaned_trips()`.
#' @param tidied_locations a tibble from `tidy_cleaned_locations()`.
#' @param project_crs a EPSG code. Default as 4326.
#'
#' @return a data.frame of class sf.
#' @export
generate_trajectories <- function(tidied_trips, tidied_locations, project_crs = 4326) {
message("About to generate trajectories")
# drop trip geometries (just a line between OD) and set trips as data.table
tidied_trips <- tidied_trips %>% sf::st_drop_geometry()
data.table::setDT(tidied_trips)
# set locations as data.table and duplicate location time to set an "interval"
data.table::setDT(tidied_locations)[, fmt_time_utc_end := fmt_time_utc]
# join locations to trips according to time interval overlap (all done in UTC time for consistency)
data.table::setkey(tidied_locations, user_id, fmt_time_utc, fmt_time_utc_end)
data.table::setkey(tidied_trips, user_id, start_fmt_time, end_fmt_time)
joined_trips_and_locs <- data.table::foverlaps(tidied_locations, tidied_trips, nomatch = NULL)
# convert locations to points
trajectories <- joined_trips_and_locs %>%
dplyr::mutate(location_points = purrr::map(loc_coordinates, sf::st_point)) %>%
sf::st_sf(crs = project_crs) %>%
# cast linestrings between points of the same trips
dplyr::group_by(dplyr::across(colnames(tidied_trips))) %>%
dplyr::summarise(do_union = FALSE) %>%
sf::st_cast("LINESTRING")
message("Trajectories created")
return(trajectories)
}
#' Convert columns to datetime
#'
#' @param .data a data.table object
#' @param cols the names of the columns to convert to datetime
#' @param tz time zone default as "Australia/Sydney".
#'
#' @return .data
#' @export
convert_columns_to_datetime <-
function(.data, cols, tz = "Australia/Sydney") {
stopifnot(data.table::is.data.table(.data))
.data[, `:=`(c(cols), lapply(.SD, function(.x) {
lubridate::as_datetime(.x, tz = tz)
})), .SDcols = cols]
}
#' Normalise UUID
#'
#' @param .data a data.frame.
#' @param keep_uuid logical.
#'
#' @return a data.frame
#' @export
normalise_uuid <- function(.data, keep_uuid = FALSE) {
if (nrow(.data) == 0) {
return(.data)
}
# return(.data)
if (!is.data.table(.data)) {
setDT(.data)
}
if ("uuid" %in% names(.data)) {
.data[, user_id := sapply(uuid, function(.x) paste0(unlist(.x), collapse = ""))]
if (!keep_uuid) {
.data[, uuid := NULL]
}
} else {
.data[, user_id := sapply(user_id, function(.x) paste0(unlist(.x), collapse = ""))]
}
.data
}
#' Anonymize user_id field
#'
#' @param .data a data.frame object
#'
#' @return .data with anonymized user_id
#' @export
anonymize_uuid <- function(.data) {
stopifnot(is.data.frame(.data))
if (!"user_id" %in% names(.data)) {
stop("There is no `user_id` field in `.data`.")
}
unique_uuid <- unique(.data$user_id)
anon_uuid <- paste0("user_", sample(length(unique_uuid)))
names(anon_uuid) <- unique_uuid
.data$user_id <- anon_uuid[.data$user_id]
.data
}
anonymize_uuid_if_required <- function(.data, flag = getOption("emdash.anon_locations")) {
checkmate::assert_flag(flag, null.ok = FALSE)
if (flag) {
message("Anonymize trajectories")
return(anonymize_uuid(.data))
}
.data
}
#' Convert character columns to datetime columns
#'
#' @param .data a data.frame.
#' @param cols columns to convert to datetime columns.
#' @param tz time zone. Default as "Australia/Sydney".
#'
#' @return .data
#' @export
convert_datetime_string_to_datetime <- function(.data, cols, tz = "Australia/Sydney") {
stopifnot(data.table::is.data.table(.data))
.data[, c(cols) := lapply(.SD, function(.x) {
lubridate::as_datetime(.x, tz = tz)
}), .SDcols = cols]
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.