Nothing
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# ------- Read RDD ------------
#' Create a SpatialRDD from an external data source.
#'
#' Import spatial object from an external data source into a Sedona SpatialRDD.
#'
#' @param sc A `spark_connection`.
#' @param location Location of the data source.
#' @param type Type of the SpatialRDD (must be one of "point", "polygon", or
#' "linestring".
#' @param has_non_spatial_attrs Whether the input contains non-spatial
#' attributes.
#' @param storage_level Storage level of the RDD (default: MEMORY_ONLY).
#' @param repartition The minimum number of partitions to have in the resulting
#' RDD (default: 1).
#'
#' @name sedona_spatial_rdd_data_source
#' @keywords internal
NULL
#' Create a typed SpatialRDD from a delimiter-separated values data source.
#'
#' Create a typed SpatialRDD (namely, a PointRDD, a PolygonRDD, or a
#' LineStringRDD) from a data source containing delimiter-separated values.
#' The data source can contain spatial attributes (e.g., longitude and latidude)
#' and other attributes. Currently only inputs with spatial attributes occupying
#' a contiguous range of columns (i.e.,
#' \[first_spatial_col_index, last_spatial_col_index\]) are supported.
#'
#' @inheritParams sedona_spatial_rdd_data_source
#' @param delimiter Delimiter within each record. Must be one of
#' ',', '\\t', '?', '\\'', '"', '_', '-', '%', '~', '|', ';'
#' @param first_spatial_col_index Zero-based index of the left-most column
#' containing spatial attributes (default: 0).
#' @param last_spatial_col_index Zero-based index of the right-most column
#' containing spatial attributes (default: NULL). Note last_spatial_col_index
#' does not need to be specified when creating a PointRDD because it will
#' automatically have the implied value of (first_spatial_col_index + 1).
#' For all other types of RDDs, if last_spatial_col_index is unspecified, then
#' it will assume the value of -1 (i.e., the last of all input columns).
#'
#' @return A typed SpatialRDD.
#'
#' @examples
#' library(sparklyr)
#' library(apache.sedona)
#'
#' sc <- spark_connect(master = "spark://HOST:PORT")
#'
#' if (!inherits(sc, "test_connection")) {
#' input_location <- "/dev/null" # replace it with the path to your csv file
#' rdd <- sedona_read_dsv_to_typed_rdd(
#' sc,
#' location = input_location,
#' delimiter = ",",
#' type = "point",
#' first_spatial_col_index = 1L
#' )
#' }
#'
#' @family Sedona RDD data interface functions
#'
#' @export
sedona_read_dsv_to_typed_rdd <- function(sc,
location,
delimiter = c(",", "\t", "?", "'", "\"", "_", "-", "%", "~", "|", ";"),
type = c("point", "polygon", "linestring"),
first_spatial_col_index = 0L,
last_spatial_col_index = NULL,
has_non_spatial_attrs = TRUE,
storage_level = "MEMORY_ONLY",
repartition = 1L) {
delimiter <- to_delimiter_enum_value(sc, match.arg(delimiter))
rdd_cls <- rdd_cls_from_type(type)
first_spatial_col_index <- as.integer(first_spatial_col_index)
if (type != "point") {
last_spatial_col_index <- last_spatial_col_index %||% -1L
} else {
if (!is.null(last_spatial_col_index)) {
if (as.integer(last_spatial_col_index) != first_spatial_col_index + 1L) {
stop(
"last_spatial_col_index must be either unspecified or be equal to ",
"(first_spatial_col_index + 1) for PointRDD"
)
}
} else {
last_spatial_col_index <- first_spatial_col_index + 1L
}
}
last_spatial_col_index <- as.integer(last_spatial_col_index)
fmt <- (
if (identical(type, "point")) {
if (!is.null(last_spatial_col_index) &&
last_spatial_col_index != first_spatial_col_index + 1L) {
}
invoke_new(
sc,
"org.apache.sedona.core.formatMapper.PointFormatMapper",
first_spatial_col_index,
delimiter,
has_non_spatial_attrs
)
} else {
fmt_cls <- paste0(
"org.apache.sedona.core.formatMapper.",
if (identical(type, "polygon")) {
"Polygon"
} else {
"LineString"
},
"FormatMapper"
)
invoke_new(
sc,
fmt_cls,
first_spatial_col_index,
last_spatial_col_index,
delimiter,
has_non_spatial_attrs
)
})
invoke_new(
sc,
rdd_cls,
java_context(sc),
location,
max(as.integer(repartition %||% 1L), 1L),
fmt
) %>%
set_storage_level(storage_level) %>%
new_spatial_rdd(type)
}
#' (Deprecated) Create a typed SpatialRDD from a shapefile or geojson data source.
#'
#' @description
#' `r lifecycle::badge("deprecated")`
#'
#' Constructors of typed RDD (PointRDD, PolygonRDD, LineStringRDD) are soft deprecated, use non-types versions
#'
#' Create a typed SpatialRDD (namely, a PointRDD, a PolygonRDD, or a
#' LineStringRDD)
#' * `sedona_read_shapefile_to_typed_rdd`: from a shapefile data source
#' * `sedona_read_geojson_to_typed_rdd`: from a GeoJSON data source
#'
#'
#' @param sc A `spark_connection`.
#' @param location Location of the data source.
#' @param type Type of the SpatialRDD (must be one of "point", "polygon", or
#' "linestring".
#' @param has_non_spatial_attrs Whether the input contains non-spatial
#' attributes.
#' @param storage_level Storage level of the RDD (default: MEMORY_ONLY).
#' @param repartition The minimum number of partitions to have in the resulting
#' RDD (default: 1).
#'
#' @return A typed SpatialRDD.
#'
#' @examples
#' library(sparklyr)
#' library(apache.sedona)
#'
#' sc <- spark_connect(master = "spark://HOST:PORT")
#'
#' if (!inherits(sc, "test_connection")) {
#' input_location <- "/dev/null" # replace it with the path to your shapefile
#' rdd <- sedona_read_shapefile_to_typed_rdd(
#' sc,
#' location = input_location, type = "polygon"
#' )
#' }
#'
#' @family Sedona RDD data interface functions
#'
#' @export
sedona_read_shapefile_to_typed_rdd <- function(sc,
location,
type = c("point", "polygon", "linestring"),
storage_level = "MEMORY_ONLY") {
lifecycle::deprecate_soft(
"1.4.0",
"sedona_read_shapefile_to_typed_rdd()",
with = "sedona_read_shapefile()"
)
invoke_static(
sc,
"org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader",
paste0("readTo", to_camel_case(type), "RDD"),
java_context(sc),
location
) %>%
set_storage_level(storage_level) %>%
new_spatial_rdd(type)
}
#' @export
#' @rdname sedona_read_shapefile_to_typed_rdd
sedona_read_geojson_to_typed_rdd <- function(sc,
location,
type = c("point", "polygon", "linestring"),
has_non_spatial_attrs = TRUE,
storage_level = "MEMORY_ONLY",
repartition = 1L) {
lifecycle::deprecate_soft(
"1.4.0",
"sedona_read_geojson_to_typed_rdd()",
with = "sedona_read_geojson()"
)
invoke_new(
sc,
rdd_cls_from_type(type),
java_context(sc),
location,
sc$state$enums$delimiter$geojson,
has_non_spatial_attrs,
max(as.integer(repartition %||% 1L), 1L)
) %>%
set_storage_level(storage_level) %>%
new_spatial_rdd(type)
}
#' Read geospatial data into a Spatial RDD
#'
#' @description Import spatial object from an external data source into a Sedona SpatialRDD.
#' * `sedona_read_shapefile`: from a shapefile
#' * `sedona_read_geojson`: from a geojson file
#' * `sedona_read_wkt`: from a geojson file
#' * `sedona_read_wkb`: from a geojson file
#'
#' @param sc A `spark_connection`.
#' @param location Location of the data source.
#' @param storage_level Storage level of the RDD (default: MEMORY_ONLY).
#' @param repartition The minimum number of partitions to have in the resulting
#' RDD (default: 1).
#' @param allow_invalid_geometries Whether to allow topology-invalid
#' geometries to exist in the resulting RDD.
#' @param skip_syntactically_invalid_geometries Whether to allows Sedona to
#' automatically skip syntax-invalid geometries, rather than throwing
#' errorings.
#' @param wkt_col_idx Zero-based index of column containing hex-encoded WKB data
#' (default: 0).
#' @param wkb_col_idx Zero-based index of column containing hex-encoded WKB data
#' (default: 0).
#'
#' @return A SpatialRDD.
#'
#' @examples
#' library(sparklyr)
#' library(apache.sedona)
#'
#' sc <- spark_connect(master = "spark://HOST:PORT")
#'
#' if (!inherits(sc, "test_connection")) {
#' input_location <- "/dev/null" # replace it with the path to your input file
#' rdd <- sedona_read_geojson(sc, location = input_location)
#' }
#'
#' @family Sedona RDD data interface functions
#'
#' @export
sedona_read_geojson <- function(sc,
location,
allow_invalid_geometries = TRUE,
skip_syntactically_invalid_geometries = TRUE,
storage_level = "MEMORY_ONLY",
repartition = 1L) {
raw_text_rdd <- invoke(
java_context(sc),
"textFile",
location,
max(as.integer(repartition %||% 1L), 1L)
)
invoke_static(
sc,
"org.apache.sedona.core.formatMapper.GeoJsonReader",
"readToGeometryRDD",
raw_text_rdd,
allow_invalid_geometries,
skip_syntactically_invalid_geometries
) %>%
set_storage_level(storage_level) %>%
new_spatial_rdd(NULL)
}
#' @export
#' @rdname sedona_read_geojson
sedona_read_wkb <- function(sc,
location,
wkb_col_idx = 0L,
allow_invalid_geometries = TRUE,
skip_syntactically_invalid_geometries = TRUE,
storage_level = "MEMORY_ONLY",
repartition = 1L) {
raw_text_rdd <- invoke(
java_context(sc),
"textFile",
location,
max(as.integer(repartition %||% 1L), 1L)
)
invoke_static(
sc,
"org.apache.sedona.core.formatMapper.WkbReader",
"readToGeometryRDD",
raw_text_rdd,
as.integer(wkb_col_idx),
allow_invalid_geometries,
skip_syntactically_invalid_geometries
) %>%
set_storage_level(storage_level) %>%
new_spatial_rdd(NULL)
}
#' @export
#' @rdname sedona_read_geojson
sedona_read_wkt <- function(sc,
location,
wkt_col_idx = 0L,
allow_invalid_geometries = TRUE,
skip_syntactically_invalid_geometries = TRUE,
storage_level = "MEMORY_ONLY",
repartition = 1L) {
raw_text_rdd <- invoke(
java_context(sc),
"textFile",
location,
max(as.integer(repartition %||% 1L), 1L)
)
invoke_static(
sc,
"org.apache.sedona.core.formatMapper.WktReader",
"readToGeometryRDD",
raw_text_rdd,
as.integer(wkt_col_idx),
allow_invalid_geometries,
skip_syntactically_invalid_geometries
) %>%
set_storage_level(storage_level) %>%
new_spatial_rdd(NULL)
}
#' @rdname sedona_read_geojson
#' @export
sedona_read_shapefile <- function(sc,
location,
storage_level = "MEMORY_ONLY") {
invoke_static(
sc,
"org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader",
paste0("readToGeometryRDD"),
java_context(sc),
location
) %>%
set_storage_level(storage_level) %>%
new_spatial_rdd(NULL)
}
# ------- Read SDF ------------
#' Read geospatial data into a Spark DataFrame.
#'
#' @description Functions to read geospatial data from a variety of formats into Spark DataFrames.
#'
#' * `spark_read_shapefile`: from a shapefile
#' * `spark_read_geojson`: from a geojson file
#' * `spark_read_geoparquet`: from a geoparquet file
#'
#' @inheritParams sparklyr::spark_read_source
#'
#'
#' @return A tbl
#'
#' @examples
#' library(sparklyr)
#' library(apache.sedona)
#'
#' sc <- spark_connect(master = "spark://HOST:PORT")
#'
#' if (!inherits(sc, "test_connection")) {
#' input_location <- "/dev/null" # replace it with the path to your input file
#' rdd <- spark_read_shapefile(sc, location = input_location)
#' }
#'
#' @family Sedona DF data interface functions
#'
#' @export
spark_read_shapefile <- function(sc,
name = NULL,
path = name,
options = list(),
...) {
lapply(names(options), function(name) {
if (!name %in% c("")) {
warning(paste0("Ignoring unknown option '", name,"'"))
}
})
rdd <- sedona_read_shapefile(sc,
location = path,
storage_level = "MEMORY_ONLY")
rdd %>% sdf_register(name = name)
}
#' @export
#' @rdname spark_read_shapefile
spark_read_geojson <- function(sc,
name = NULL,
path = name,
options = list(),
repartition = 0,
memory = TRUE,
overwrite = TRUE) {
# check options
if ("allow_invalid_geometries" %in% names(options)) final_allow_invalid <- options[["allow_invalid_geometries"]] else final_allow_invalid <- TRUE
if ("skip_syntactically_invalid_geometries" %in% names(options)) final_skip <- options[["skip_syntactically_invalid_geometries"]] else final_skip <- TRUE
lapply(names(options), function(name) {
if (!name %in% c("allow_invalid_geometries", "skip_syntactically_invalid_geometries")) {
warning(paste0("Ignoring unknown option '", name,"'"))
}
})
final_repartition <- max(as.integer(repartition), 1L)
rdd <- sedona_read_geojson(sc,
location = path,
allow_invalid_geometries = final_allow_invalid,
skip_syntactically_invalid_geometries = final_skip,
storage_level = "MEMORY_ONLY",
repartition = final_repartition)
rdd %>% sdf_register(name = name)
}
#' @export
#' @rdname spark_read_shapefile
#' @importFrom sparklyr spark_read_source
spark_read_geoparquet <- function(sc,
name = NULL,
path = name,
options = list(),
repartition = 0,
memory = TRUE,
overwrite = TRUE) {
spark_read_source(sc,
name = name,
path = path,
source = "geoparquet",
options = options,
repartition = repartition,
memory = memory,
overwrite = overwrite,
columns = NULL)
}
# ------- Write RDD ------------
#' Write SpatialRDD into a file.
#'
#' Export serialized data from a Sedona SpatialRDD into a file.
#' * `sedona_write_wkb`:
#' * `sedona_write_wkt`:
#' * `sedona_write_geojson`:
#'
#' @param x The SpatialRDD object.
#' @param output_location Location of the output file.
#'
#' @return No return value.
#'
#' @examples
#' library(sparklyr)
#' library(apache.sedona)
#'
#' sc <- spark_connect(master = "spark://HOST:PORT")
#'
#' if (!inherits(sc, "test_connection")) {
#' input_location <- "/dev/null" # replace it with the path to your input file
#' rdd <- sedona_read_wkb(
#' sc,
#' location = input_location,
#' wkb_col_idx = 0L
#' )
#' sedona_write_wkb(rdd, "/tmp/wkb_output.tsv")
#' }
#'
#' @family Sedona RDD data interface functions
#'
#' @export
sedona_write_wkb <- function(x, output_location) {
invoke(x$.jobj, "saveAsWKB", output_location)
}
#' @export
#' @rdname sedona_write_wkb
sedona_write_wkt <- function(x, output_location) {
invoke(x$.jobj, "saveAsWKT", output_location)
}
#' @export
#' @rdname sedona_write_wkb
sedona_write_geojson <- function(x, output_location) {
invoke(x$.jobj, "saveAsGeoJSON", output_location)
}
#' Save a Spark dataframe containing exactly 1 spatial column into a file.
#'
#' Export serialized data from a Spark dataframe containing exactly 1 spatial
#' column into a file.
#'
#' @param x A Spark dataframe object in sparklyr or a dplyr expression
#' representing a Spark SQL query.
#' @param spatial_col The name of the spatial column.
#' @param output_location Location of the output file.
#' @param output_format Format of the output.
#'
#' @return No return value.
#'
#' @examples
#' library(sparklyr)
#' library(apache.sedona)
#'
#' sc <- spark_connect(master = "spark://HOST:PORT")
#'
#' if (!inherits(sc, "test_connection")) {
#' tbl <- dplyr::tbl(
#' sc,
#' dplyr::sql("SELECT ST_GeomFromText('POINT(-71.064544 42.28787)') AS `pt`")
#' )
#' sedona_save_spatial_rdd(
#' tbl %>% dplyr::mutate(id = 1),
#' spatial_col = "pt",
#' output_location = "/tmp/pts.wkb",
#' output_format = "wkb"
#' )
#' }
#'
#' @family Sedona RDD data interface functions
#'
#' @export
sedona_save_spatial_rdd <- function(x,
spatial_col,
output_location,
output_format = c("wkb", "wkt", "geojson")) {
spatial_rdd <- to_spatial_rdd(x, spatial_col)
output_format <- match.arg(output_format)
do.call(
paste0("sedona_write_", output_format),
list(spatial_rdd, output_location)
)
}
# ------- Write SDF ------------
### No shapefile writer in Sedona
#' Write geospatial data from a Spark DataFrame.
#'
#' @description Functions to write geospatial data into a variety of formats from Spark DataFrames.
#'
#' * `spark_write_geojson`: to GeoJSON
#' * `spark_write_geoparquet`: to GeoParquet
#' * `spark_write_raster`: to raster tiles after using RS output functions (`RS_AsXXX`)
#'
#'
#' @param path The path to the file. Needs to be accessible from the cluster.
#' Supports the \samp{"hdfs://"}, \samp{"s3a://"} and \samp{"file://"} protocols.
#' @inheritParams sparklyr::spark_write_source
#'
#'
#' @return NULL
#'
#' @examples
#' library(sparklyr)
#' library(apache.sedona)
#'
#' sc <- spark_connect(master = "spark://HOST:PORT")
#'
#' if (!inherits(sc, "test_connection")) {
#' tbl <- dplyr::tbl(
#' sc,
#' dplyr::sql("SELECT ST_GeomFromText('POINT(-71.064544 42.28787)') AS `pt`")
#' )
#' spark_write_geojson(
#' tbl %>% dplyr::mutate(id = 1),
#' output_location = "/tmp/pts.geojson"
#' )
#' }
#'
#' @family Sedona DF data interface functions
#'
#' @importFrom sparklyr spark_write_source
#' @export
spark_write_geojson <- function(x,
path,
mode = NULL,
options = list(),
partition_by = NULL,
...) {
## find geometry column if not specified
if (!"spatial_col" %in% names(options)) {
schema <- x %>% sdf_schema()
potential_cols <- which(sapply(schema, function(x) x$type == "GeometryUDT"))
if (length(potential_cols) == 0) {
cli::cli_abort("No geometry column found")
} else if (length(potential_cols) > 1) {
spatial_col = names(potential_cols)[1]
cli::cli_warn("Multiple geometry columns found, using {spatial_col}")
} else {
spatial_col = names(potential_cols)
}
} else {
spatial_col = options[["spatial_col"]]
}
rdd <- x %>% to_spatial_rdd(spatial_col = spatial_col)
sedona_write_geojson(x = rdd, output_location = path)
}
#' @export
#' @rdname spark_write_geojson
#' @importFrom sparklyr spark_write_source
spark_write_geoparquet <- function(x,
path,
mode = NULL,
options = list(),
partition_by = NULL,
...) {
spark_write_source(
x = x,
source = "geoparquet",
mode = mode,
options = options,
partition_by = partition_by,
save_args = list(path),
...
)
}
#' @export
#' @rdname spark_write_geojson
#' @importFrom sparklyr spark_write_source
spark_write_raster <- function(x,
path,
mode = NULL,
options = list(),
partition_by = NULL,
...) {
spark_write_source(
x = x,
source = "raster",
mode = mode,
options = options,
partition_by = partition_by,
save_args = list(path),
...
)
}
# ------- Utilities ------------
rdd_cls_from_type <- function(type = c("point", "polygon", "linestring")) {
type <- match.arg(type)
paste0(
"org.apache.sedona.core.spatialRDD.",
to_camel_case(type),
"RDD"
)
}
to_camel_case <- function(type) {
switch(type,
point = "Point",
polygon = "Polygon",
linestring = "LineString"
)
}
to_delimiter_enum_value <- function(sc, delimiter) {
delimiter <- switch(delimiter,
"," = "csv",
"\t" = "tsv",
"?" = "questionmark",
"'" = "singlequote",
"\"" = "quote",
"_" = "underscore",
"-" = "dash",
"%" = "percent",
"~" = "tilde",
"|" = "pipe",
";" = "semicolon",
stop("Unsupported delimiter '", delimiter, "'")
)
sc$state$enums$delimiter[[delimiter]]
}
set_storage_level <- function(rdd, storage_level) {
sc <- spark_connection(rdd)
storage_level <- sc$state$object_cache$storage_levels[[storage_level]] %||% {
storage_level_obj <- invoke_static(
sc, "org.apache.spark.storage.StorageLevel", storage_level
)
sc$state$object_cache$storage_levels[[storage_level]] <- storage_level_obj
storage_level_obj
}
invoke(rdd, "analyze", storage_level)
rdd
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.