Skip to content

Commit

Permalink
added raster tests to sparklyr bindings
Browse files Browse the repository at this point in the history
  • Loading branch information
sllynn committed Feb 2, 2024
1 parent a5998b7 commit 0d49847
Show file tree
Hide file tree
Showing 18 changed files with 254 additions and 63 deletions.
2 changes: 1 addition & 1 deletion R/install_deps.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
options(repos = c(CRAN = "https://packagemanager.posit.co/cran/__linux__/jammy/latest"))

install.packages(c("pkgbuild", "testthat", "roxygen2", "sparklyr"))
install.packages(c("pkgbuild", "testthat", "roxygen2", "sparklyr", "readr", "sparklyr.nested"))
1 change: 1 addition & 0 deletions R/sparkR-mosaic/sparkrMosaic/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ Imports:
methods
Suggests:
testthat (>= 3.0.0)
readr (>= 2.1.5)
Config/testthat/edition: 3
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
inputGJ = '{
{
"type":"Feature",
"properties":{
"shape_area":"0.0000607235737749",
Expand Down Expand Up @@ -225,4 +225,4 @@ inputGJ = '{
]
]
}
}'
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
source("data.R")

test_that("scalar vector functions behave as intended", {
sdf <- SparkR::createDataFrame(
data.frame(
Expand Down
1 change: 1 addition & 0 deletions R/sparklyr-mosaic/sparklyrMosaic/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ Imports:
sparklyr
Suggests:
testthat (>= 3.0.0)
readr (>= 2.1.5)
Config/testthat/edition: 3
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
inputGJ = '{
{
"type":"Feature",
"properties":{
"shape_area":"0.0000607235737749",
Expand Down Expand Up @@ -225,4 +225,4 @@ inputGJ = '{
]
]
}
}'
}
Binary file not shown.
181 changes: 181 additions & 0 deletions R/sparklyr-mosaic/sparklyrMosaic/tests/testthat/testRasterFunctions.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
generate_singleband_raster_df <- function() {
spark_read_source(
sc,
name = "raster",
source = "gdal",
path = "data/MCD43A4.A2018185.h10v07.006.2018194033728_B04.TIF",
options = list("raster.read.strategy" = "in_memory")
)
}


test_that("mosaic can read single-band GeoTiff", {
sdf <- generate_singleband_raster_df()
row <- sdf %>% head(1) %>% sdf_collect
expect_equal(row$length, 1067862L)
expect_equal(row$x_size, 2400)
expect_equal(row$y_size, 2400)
expect_equal(row$srid, 0)
expect_equal(row$bandCount, 1)
expect_equal(row$metadata[[1]]$LONGNAME, "MODIS/Terra+Aqua BRDF/Albedo Nadir BRDF-Adjusted Ref Daily L3 Global - 500m")
expect_equal(row$tile[[1]]$driver, "GTiff")

})


test_that("scalar raster functions behave as intended", {
sdf <- generate_singleband_raster_df() %>%
mutate(rst_bandmetadata = rst_bandmetadata(tile, 1L)) %>%
mutate(rst_boundingbox = rst_boundingbox(tile)) %>%
mutate(rst_boundingbox = st_buffer(rst_boundingbox, -0.001)) %>%
mutate(rst_clip = rst_clip(tile, rst_boundingbox)) %>%
mutate(rst_combineavg = rst_combineavg(array(tile, rst_clip))) %>%
mutate(rst_frombands = rst_frombands(array(tile, tile))) %>%
mutate(rst_fromfile = rst_fromfile(path, -1L)) %>%
mutate(rst_georeference = rst_georeference(tile)) %>%
mutate(rst_getnodata = rst_getnodata(tile)) %>%
mutate(rst_subdatasets = rst_subdatasets(tile)) %>%
mutate(rst_height = rst_height(tile)) %>%
mutate(rst_initnodata = rst_initnodata(tile)) %>%
mutate(rst_isempty = rst_isempty(tile)) %>%
mutate(rst_memsize = rst_memsize(tile)) %>%
mutate(rst_merge = rst_merge(array(tile, tile))) %>%
mutate(rst_metadata = rst_metadata(tile)) %>%
mutate(rst_ndvi = rst_ndvi(tile, 1L, 1L)) %>%
mutate(rst_numbands = rst_numbands(tile)) %>%
mutate(rst_pixelheight = rst_pixelheight(tile)) %>%
mutate(rst_pixelwidth = rst_pixelwidth(tile))

# breaking the chain here to avoid memory issues
expect_no_error(spark_write_source(sdf, "noop", mode = "overwrite"))

sdf <- generate_singleband_raster_df() %>%
mutate(rst_rastertogridavg = rst_rastertogridavg(tile, 9L)) %>%
mutate(rst_rastertogridcount = rst_rastertogridcount(tile, 9L)) %>%
mutate(rst_rastertogridmax = rst_rastertogridmax(tile, 9L)) %>%
mutate(rst_rastertogridmedian = rst_rastertogridmedian(tile, 9L)) %>%
mutate(rst_rastertogridmin = rst_rastertogridmin(tile, 9L)) %>%
mutate(rst_rastertoworldcoordx = rst_rastertoworldcoordx(tile, 1200L, 1200L)) %>%
mutate(rst_rastertoworldcoordy = rst_rastertoworldcoordy(tile, 1200L, 1200L)) %>%
mutate(rst_rastertoworldcoord = rst_rastertoworldcoord(tile, 1200L, 1200L)) %>%
mutate(rst_rotation = rst_rotation(tile)) %>%
mutate(rst_scalex = rst_scalex(tile)) %>%
mutate(rst_scaley = rst_scaley(tile)) %>%
mutate(rst_srid = rst_srid(tile)) %>%
mutate(rst_summary = rst_summary(tile)) %>%
mutate(rst_upperleftx = rst_upperleftx(tile)) %>%
mutate(rst_upperlefty = rst_upperlefty(tile)) %>%
mutate(rst_width = rst_width(tile)) %>%
mutate(rst_worldtorastercoordx = rst_worldtorastercoordx(tile, as.double(0.0), as.double(0.0))) %>%
mutate(rst_worldtorastercoordy = rst_worldtorastercoordy(tile, as.double(0.0), as.double(0.0))) %>%
mutate(rst_worldtorastercoord = rst_worldtorastercoord(tile, as.double(0.0), as.double(0.0)))

expect_no_error(spark_write_source(sdf, "noop", mode = "overwrite"))
})

test_that("raster flatmap functions behave as intended", {
retiled_sdf <- generate_singleband_raster_df() %>%
mutate(rst_retile = rst_retile(tile, 1200L, 1200L))

expect_no_error(spark_write_source(retiled_sdf, "noop", mode = "overwrite"))
expect_equal(sdf_nrow(retiled_sdf), 4)

subdivide_sdf <- generate_singleband_raster_df() %>%
mutate(rst_subdivide = rst_subdivide(tile, 1L))

expect_no_error(spark_write_source(subdivide_sdf, "noop", mode = "overwrite"))
expect_equal(sdf_nrow(subdivide_sdf), 4)

tessellate_sdf <- generate_singleband_raster_df() %>%
mutate(rst_tessellate = rst_tessellate(tile, 3L))

expect_no_error(spark_write_source(tessellate_sdf, "noop", mode = "overwrite"))
expect_equal(sdf_nrow(tessellate_sdf), 140)

overlap_sdf <- generate_singleband_raster_df() %>%
mutate(rst_to_overlapping_tiles = rst_to_overlapping_tiles(tile, 200L, 200L, 10L))

expect_no_error(spark_write_source(overlap_sdf, "noop", mode = "overwrite"))
expect_equal(sdf_nrow(overlap_sdf), 196)

})

test_that("raster aggregation functions behave as intended", {
collection_sdf <- generate_singleband_raster_df() %>%
mutate(extent = st_astext(rst_boundingbox(tile))) %>%
mutate(tile = rst_to_overlapping_tiles(tile, 200L, 200L, 10L))

merge_sdf <- collection_sdf %>%
group_by(path) %>%
summarise(tile = rst_merge_agg(tile)) %>%
mutate(extent = st_astext(rst_boundingbox(tile)))

expect_equal(sdf_nrow(merge_sdf), 1)
expect_equal(
collection_sdf %>% head(1) %>% collect %>% .$extent,
merge_sdf %>% head(1) %>% collect %>% .$extent
)

combine_avg_sdf <- collection_sdf %>%
group_by(path) %>%
summarise(tile = rst_combineavg_agg(tile)) %>%
mutate(extent = st_astext(rst_boundingbox(tile)))

expect_equal(sdf_nrow(combine_avg_sdf), 1)
expect_equal(
collection_sdf %>% head(1) %>% collect %>% .$extent,
combine_avg_sdf %>% head(1) %>% collect %>% .$extent
)

})

test_that("the tessellate-join-clip-merge flow works on NetCDF files", {
target_resolution <- 1L

region_keys <- c("NAME", "STATE", "BOROUGH", "BLOCK", "TRACT")

census_sdf <- spark_read_source(
sc,
name = "census_raw",
source = "com.databricks.labs.mosaic.datasource.OGRFileFormat",
path = "data/Blocks2020.zip",
options = list(
"vsizip" = "true",
"chunkSize" = "20"
)
) %>%
select(region_keys, geom_0, geom_0_srid) %>%
distinct() %>%
mutate(geom_0 = st_simplify(geom_0, as.double(0.001))) %>%
mutate(geom_0 = st_updatesrid(geom_0, geom_0_srid, 4326L)) %>%
mutate(chip = grid_tessellateexplode(geom_0, target_resolution)) %>%
sdf_select(region_keys, chip$is_core, chip$index_id, chip$wkb)

raster_sdf <-
spark_read_source(
sc,
name = "raster_raw",
source = "gdal",
path = "data/prAdjust_day_HadGEM2-CC_SMHI-DBSrev930-GFD-1981-2010-postproc_rcp45_r1i1p1_20201201-20201231.nc",
options = list("raster.read.strategy" = "retile_on_read")
) %>%
mutate(tile = rst_separatebands(tile)) %>%
sdf_register("raster")

indexed_raster_sdf <- sdf_sql(sc, "SELECT tile, element_at(rst_metadata(tile), 'NC_GLOBAL#GDAL_MOSAIC_BAND_INDEX') as timestep FROM raster") %>%
filter(timestep == 21L) %>%
mutate(tile = rst_setsrid(tile, 4326L)) %>%
mutate(tile = rst_to_overlapping_tiles(tile, 20L, 20L, 10L)) %>%
mutate(tile = rst_tessellate(tile, target_resolution))

clipped_sdf <- indexed_raster_sdf %>%
sdf_select(tile, tile.index_id, timestep, .drop_parents = FALSE) %>%
inner_join(census_sdf, by = "index_id") %>%
mutate(tile = rst_clip(tile, wkb))

merged_precipitation <- clipped_sdf %>%
group_by(region_keys, timestep) %>%
summarise(tile = rst_merge_agg(tile))

expect_equal(sdf_nrow(merged_precipitation), 1)
})
101 changes: 56 additions & 45 deletions R/sparklyr-mosaic/sparklyrMosaic/tests/testthat/testVectorFunctions.R
Original file line number Diff line number Diff line change
@@ -1,62 +1,72 @@
source("data.R")
options(warn = -1)

test_that("scalar vector functions behave as intended", {

sdf <- sdf_copy_to(
sdf_raw <- sdf_copy_to(
sc,
data.frame(
wkt = "POLYGON ((2 1, 1 2, 2 3, 2 1))",
point_wkt = "POINT (1 1)"
)
)

sdf <- mutate(sdf, "st_area" = st_area(wkt))
sdf <- mutate(sdf, "st_length" = st_length(wkt))
sdf <- mutate(sdf, "st_perimeter" = st_perimeter(wkt))
sdf <- mutate(sdf, "st_buffer" = st_buffer(wkt, as.double(1.1)))
sdf <- mutate(sdf, "st_bufferloop" = st_bufferloop(wkt, as.double(1.1), as.double(1.2)))
sdf <- mutate(sdf, "st_convexhull" = st_convexhull(wkt))
sdf <- mutate(sdf, "st_dump" = st_dump(wkt))
sdf <- mutate(sdf, "st_translate" = st_translate(wkt, 1L, 1L))
sdf <- mutate(sdf, "st_scale" = st_scale(wkt, 1L, 1L))
sdf <- mutate(sdf, "st_rotate" = st_rotate(wkt, 1L))
sdf <- mutate(sdf, "st_centroid" = st_centroid(wkt))
sdf <- mutate(sdf, "st_numpoints" = st_numpoints(wkt))
sdf <- mutate(sdf, "st_haversine" = st_haversine(as.double(0.0), as.double(90.0), as.double(0.0), as.double(0.0)))
sdf <- mutate(sdf, "st_isvalid" = st_isvalid(wkt))
sdf <- mutate(sdf, "st_hasvalidcoordinates" = st_hasvalidcoordinates(wkt, "EPSG:2192", "bounds"))
sdf <- mutate(sdf, "st_intersects" = st_intersects(wkt, wkt))
sdf <- mutate(sdf, "st_intersection" = st_intersection(wkt, wkt))
sdf <- mutate(sdf, "st_envelope" = st_envelope(wkt))
sdf <- mutate(sdf, "st_simplify" = st_simplify(wkt, as.double(0.001)))
sdf <- mutate(sdf, "st_difference" = st_difference(wkt, wkt))
sdf <- mutate(sdf, "st_union" = st_union(wkt, wkt))
sdf <- mutate(sdf, "st_unaryunion" = st_unaryunion(wkt))
sdf <- mutate(sdf, "st_geometrytype" = st_geometrytype(wkt))
sdf <- mutate(sdf, "st_xmin" = st_xmin(wkt))
sdf <- mutate(sdf, "st_xmax" = st_xmax(wkt))
sdf <- mutate(sdf, "st_ymin" = st_ymin(wkt))
sdf <- mutate(sdf, "st_ymax" = st_ymax(wkt))
sdf <- mutate(sdf, "st_zmin" = st_zmin(wkt))
sdf <- mutate(sdf, "st_zmax" = st_zmax(wkt))
sdf <- mutate(sdf, "flatten_polygons" = flatten_polygons(wkt))
sdf <- sdf_raw %>% mutate(
st_area = st_area(wkt),
st_length = st_length(wkt),
st_perimeter = st_perimeter(wkt),
st_buffer = st_buffer(wkt, as.double(1.1)),
st_bufferloop = st_bufferloop(wkt, as.double(1.1), as.double(1.2)),
st_convexhull = st_convexhull(wkt),
st_dump = st_dump(wkt),
st_translate = st_translate(wkt, 1L, 1L),
st_scale = st_scale(wkt, 1L, 1L),
st_rotate = st_rotate(wkt, 1L),
st_centroid = st_centroid(wkt),
st_numpoints = st_numpoints(wkt),
st_haversine = st_haversine(as.double(0.0), as.double(90.0), as.double(0.0), as.double(0.0)),
st_isvalid = st_isvalid(wkt),
st_hasvalidcoordinates = st_hasvalidcoordinates(wkt, "EPSG:2192", "bounds"),
st_intersects = st_intersects(wkt, wkt),
st_intersection = st_intersection(wkt, wkt),
st_envelope = st_envelope(wkt),
st_simplify = st_simplify(wkt, as.double(0.001)),
st_difference = st_difference(wkt, wkt),
st_union = st_union(wkt, wkt),
st_unaryunion = st_unaryunion(wkt),
st_geometrytype = st_geometrytype(wkt),
st_xmin = st_xmin(wkt),
st_xmax = st_xmax(wkt),
st_ymin = st_ymin(wkt),
st_ymax = st_ymax(wkt),
st_zmin = st_zmin(wkt),
st_zmax = st_zmax(wkt)
)

expect_no_error(spark_write_source(sdf, "noop", mode = "overwrite"))
expect_equal(sdf_nrow(sdf), 1)

# SRID functions

sdf <- mutate(sdf, "geom_with_srid" = st_setsrid(st_geomfromwkt(wkt), 4326L))
sdf <- mutate(sdf, "srid_check" = st_srid(geom_with_srid))
sdf <- mutate(sdf, "transformed_geom" = st_transform(geom_with_srid, 3857L))
sdf <- sdf_raw %>% mutate(
geom_with_srid = st_setsrid(st_geomfromwkt(wkt), 4326L),
srid_check = st_srid(geom_with_srid),
transformed_geom = st_transform(geom_with_srid, 3857L)
)

expect_no_error(spark_write_source(sdf, "noop", mode = "overwrite"))
expect_equal(sdf_nrow(sdf), 1)

# Grid functions

sdf <- mutate(sdf, "grid_longlatascellid" = grid_longlatascellid(as.double(1L), as.double(1L), 1L))
sdf <- mutate(sdf, "grid_pointascellid" = grid_pointascellid(point_wkt, 1L))
sdf <- mutate(sdf, "grid_boundaryaswkb" = grid_boundaryaswkb(grid_longlatascellid))
sdf <- mutate(sdf, "grid_polyfill" = grid_polyfill(wkt, 1L))
sdf <- mutate(sdf, "grid_tessellateexplode" = grid_tessellateexplode(wkt, 1L))
sdf <- mutate(sdf, "grid_tessellateexplode_no_core_chips" = grid_tessellateexplode(wkt, 1L, FALSE))
sdf <- mutate(sdf, "grid_tessellate" = grid_tessellate(wkt, 1L))
sdf <- mutate(sdf, "grid_cellarea" = grid_cellarea(grid_longlatascellid))
sdf <- sdf_raw %>% mutate(
grid_longlatascellid = grid_longlatascellid(as.double(1L), as.double(1L), 1L),
grid_pointascellid = grid_pointascellid(point_wkt, 1L),
grid_boundaryaswkb = grid_boundaryaswkb(grid_longlatascellid),
grid_polyfill = grid_polyfill(wkt, 1L),
grid_tessellateexplode = grid_tessellateexplode(wkt, 1L),
grid_tessellate = grid_tessellate(wkt, 1L),
grid_cellarea = grid_cellarea(grid_longlatascellid)
)

expect_no_error(spark_write_source(sdf, "noop", mode = "overwrite"))
expect_equal(sdf_nrow(sdf), 1)
Expand All @@ -65,6 +75,7 @@ test_that("scalar vector functions behave as intended", {

test_that("aggregate vector functions behave as intended", {

inputGJ <- read_file("data/boroughs.geojson")
sdf <- sdf_sql(sc, "SELECT id as location_id FROM range(1)") %>%
mutate(geometry = st_geomfromgeojson(inputGJ))
expect_equal(sdf_nrow(sdf), 1)
Expand All @@ -90,8 +101,8 @@ test_that("aggregate vector functions behave as intended", {

sdf.intersection <- sdf.l %>%
inner_join(sdf.r, by = c("left_index" = "right_index"), keep = TRUE) %>%
dplyr::group_by(left_id, right_id) %>%
dplyr::summarise(
group_by(left_id, right_id) %>%
summarise(
agg_intersects = st_intersects_agg(left_index, right_index),
agg_intersection = st_intersection_agg(left_index, right_index),
left_geom = max(left_geom, 1),
Expand Down
14 changes: 6 additions & 8 deletions R/sparklyr-mosaic/tests.R
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
library(testthat)

if(length(getOption("repos")) < 1) {
options(repos = c(
CRAN = "https://cloud.r-project.org"
))
}
options(warn = -1)

install.packages("sparklyr", repos="")
library(testthat)
library(dplyr)
library(readr)
library(sparklyr)
library(sparklyr.nested)

spark_home <- Sys.getenv("SPARK_HOME")
spark_home_set(spark_home)

install.packages("sparklyrMosaic_0.4.0.tar.gz", repos = NULL)
library(sparklyrMosaic)

Expand Down
Loading

0 comments on commit 0d49847

Please sign in to comment.