R/create_ride_data.R

Defines functions create_ride_data

Documented in create_ride_data

#' Find ride trips
#'
#' Find ride sharing trips by Uber and Lyft
#'
#' @param sc Spark connection
#'
#' @export

create_ride_data <- function(sc){

	rides_schema_group2 <- c(dispatching_base_num = "character",
							 pickup_datetime = "character",
							 dropoff_datetime = "character",
							 PULocationID = "integer",
							 DOLocationID = "integer")

	rides_schema_group3 <- c(dispatching_base_num = "character",
							 pickup_datetime = "character",
							 dropoff_datetime = "character",
							 PULocationID = "integer",
							 DOLocationID = "integer",
							 SR_Flag = "integer")

	rides_schema_group4 <- c(pickup_datetime = "character",
							 dropoff_datetime = "character",
							 PULocationID = "integer",
							 DOLocationID = "integer",
							 SR_Flag = "integer",
							 dispatching_base_num = "character",
							 drop_me = "character")

	rides_schema_group5 <- c(dispatching_base_num = "character",
							 pickup_datetime = "character",
							 dropoff_datetime = "character",
							 PULocationID = "integer",
							 DOLocationID = "integer",
							 SR_Flag = "integer")

	rides_schema_group6 <- c(hvfhs_license_num = "character",
							 dispatching_base_num = "character",
							 pickup_datetime = "character",
							 dropoff_datetime = "character",
							 PULocationID = "integer",
							 DOLocationID = "integer",
							 SR_Flag = "integer")


	bases_schema <- c(base_number = "character",
					  base_name = "character",
					  dba = "character",
					  dba_category = "character")

	sparklyr::spark_read_csv(sc,
							 "bases_all",
							 "../gozentosun2021_rawdata/fhv_bases.csv",
							 memory = FALSE,
							 columns = bases_schema)

	sparklyr::sdf_sql(sc, "SELECT base_number, dba_category
						   FROM bases_all
						   WHERE dba_category IN ('lyft', 'uber')") %>%
		dplyr::compute("bases")



	sparklyr::spark_read_csv(sc,
							 "rides_group2",
							 "../gozentosun2021_rawdata/fhvtrips/group2",
							 memory = FALSE,
							 columns = rides_schema_group2)

	sparklyr::spark_read_csv(sc,
							 "rides_group3",
							 "../gozentosun2021_rawdata/fhvtrips/group3",
							 memory = FALSE,
							 columns = rides_schema_group3)

	sparklyr::spark_read_csv(sc,
							 "rides_group4",
							 "../gozentosun2021_rawdata/fhvtrips/group4",
							 memory = FALSE,
							 columns = rides_schema_group4)

	sparklyr::spark_read_csv(sc,
							 "rides_group5",
							 "../gozentosun2021_rawdata/fhvtrips/group5",
							 memory = FALSE,
							 columns = rides_schema_group5)

	sparklyr::spark_read_csv(sc,
							 "rides_group6",
							 "../gozentosun2021_rawdata/fhvtrips/group6",
							 memory = FALSE,
							 columns = rides_schema_group6)


	query <- paste0("SELECT bases.dba_category, temp1.pickup_date, temp1.pickup_hour, temp1.pickup_zone, COUNT(*) AS n
					FROM(
						SELECT dispatching_base_num AS base_num, SUBSTRING(pickup_datetime, 1, 10) AS pickup_date, SUBSTRING(pickup_datetime, 12, 2) AS pickup_hour, PULocationID AS pickup_zone
						FROM rides_group", 2:6,
					 " ) temp1
						INNER JOIN bases
						ON temp1.base_num = bases.base_number
						GROUP BY bases.dba_category, temp1.pickup_date, temp1.pickup_hour, temp1.pickup_zone
							")


	table_list <- paste0("table", 2:6)

	purrr::map2(query,
				table_list,
				~(sparklyr::sdf_sql(sc, .x) %>% dplyr::compute(.y)))


	drop_zones_query <- paste0("(", paste(drop_zones, collapse = ", "), ")")

	sparklyr::sdf_sql(sc,
					  paste0("
					  SELECT *
					  FROM(
							SELECT * FROM table2
							UNION ALL
							SELECT * FROM table3
							UNION ALL
							SELECT * FROM table4
							UNION ALL
							SELECT * FROM table5
							UNION ALL
							SELECT * FROM table6
						  ) AS temp
					  WHERE pickup_zone NOT IN ", drop_zones_query)) %>%
		dplyr::compute("rides")

	sparklyr::sdf_sql(sc, "DROP TABLE bases")
	sparklyr::sdf_sql(sc, "DROP TABLE bases_all")
	sparklyr::sdf_sql(sc, "DROP TABLE rides_group2")
	sparklyr::sdf_sql(sc, "DROP TABLE rides_group3")
	sparklyr::sdf_sql(sc, "DROP TABLE rides_group4")
	sparklyr::sdf_sql(sc, "DROP TABLE rides_group5")
	sparklyr::sdf_sql(sc, "DROP TABLE rides_group6")
	sparklyr::sdf_sql(sc, "DROP TABLE table2")
	sparklyr::sdf_sql(sc, "DROP TABLE table3")
	sparklyr::sdf_sql(sc, "DROP TABLE table4")
	sparklyr::sdf_sql(sc, "DROP TABLE table5")
	sparklyr::sdf_sql(sc, "DROP TABLE table6")

	return(NULL)

}
hktosun/gozentosun2021 documentation built on Dec. 20, 2021, 4:44 p.m.