R/create_bike_data.R

Defines functions create_bike_data

Documented in create_bike_data

#' Find bike stations
#'
#' Find bike stations and associated taxi zones.
#'
#' @importFrom rlang .data
#' @param sc Spark connection
#'
#' @export

create_bike_data <- function(sc){

	bikes_schema <- c(tripduration = "integer",
					  pickup_datetime = "character",
					  dropoff_datetime = "character",
					  pickup_station = "integer",
					  pickup_station_name = "character",
					  pickup_station_lat = "numeric",
					  pickup_station_lon = "numeric",
					  dropoff_station = "integer",
					  dropoff_station_name = "character",
					  dropoff_station_lat = "numeric",
					  dropoff_station_lon = "numeric",
					  bike_id = "integer",
					  usertype = "character",
					  birth_year = "integer",
					  gender = "integer")


	sparklyr::spark_read_csv(sc,
							 "bikes",
							 "../gozentosun2021_rawdata/biketrips",
							 memory = FALSE,
							 columns = bikes_schema)


	drop_stations <- c(255, 3000, 3014, 3017, 3019, 3020,
						3036, 3040, 3197, 3215, 3219, 3240,
						3245, 3248, 3250, 3266, 3432, 3446,
						3450, 3468, 3470, 3480, 3485, 3487,
						3488, 3636, 3650, 3672, 3683, 3685,
						3385, 3252, 3550, 3253, 3257, 3719,
						3239)

	drop_stations_query <- paste0("(", paste(drop_stations, collapse = ", "), ")")

	bike_stations <- sparklyr::sdf_sql(sc,
					  paste0("SELECT DISTINCT pickup_station AS station, pickup_station_lat AS lat, pickup_station_lon AS lon
							  FROM bikes
							  WHERE pickup_station NOT IN ", drop_stations_query)) %>%
		dplyr::collect()


	taxi_zones <- sf::st_read("../gozentosun2021_shapefiles/taxi_zones/taxi_zones.dbf") %>%
		dplyr::select(zone_id = .data$locationid) %>%
		dplyr::filter(!.data$zone_id %in% drop_zones)


	station_zone <- bike_stations %>%
		sf::st_as_sf(coords = c("lon", "lat"), crs = 4326) %>%
		sf::st_join(taxi_zones, join = sf::st_within) %>%
		tibble::as_tibble() %>%
		dplyr::select(.data$station, .data$zone_id) %>%
		dplyr::filter(!is.na(zone_id))

	dplyr::copy_to(sc, station_zone, "station_zone")


	sparklyr::sdf_sql(sc,
					  "
					  SELECT zone_id, SUM(n) AS n_biketrips, COUNT(*) n_bikestation
					  FROM(
						  SELECT temp2.month, temp2.pickup_station, temp2.n, station_zone.zone_id
						   FROM(
								SELECT month, pickup_station, COUNT(*) AS n
								FROM(
									SELECT CONCAT(SUBSTRING(pickup_datetime, 1, 8), '01') AS month, pickup_station
									FROM bikes) AS temp
								GROUP BY month, pickup_station) AS temp2
						  INNER JOIN station_zone
						  ON station_zone.station = temp2.pickup_station) AS temp3
					  WHERE month = '2018-12-01'
					  GROUP BY zone_id") %>%
		dplyr::compute("bike_zones")

	sparklyr::sdf_sql(sc, "DROP TABLE bikes")

	return(NULL)



}
hktosun/gozentosun2021 documentation built on Dec. 20, 2021, 4:44 p.m.