Skip to content

Commit

Permalink
Merge pull request databrickslabs#460 from databrickslabs/scala/fix/n…
Browse files Browse the repository at this point in the history
…etcdf

Fix to netcdf read process
  • Loading branch information
Milos Colic authored Feb 13, 2024
2 parents 398ef5c + 32d6f76 commit a7ab70b
Show file tree
Hide file tree
Showing 84 changed files with 1,472 additions and 454 deletions.
12 changes: 12 additions & 0 deletions .github/actions/r_build/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ runs:
shell: bash
run: |
sudo apt-get update && sudo apt-get install -y curl libcurl4-openssl-dev pkg-config libharfbuzz-dev libfribidi-dev
- name: Configure python interpreter
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python }}
- name: Install python dependencies
shell: bash
run: |
# - install pip libs
# note: gdal requires the extra args
cd python
pip install build wheel pyspark==${{ matrix.spark }} numpy==${{ matrix.numpy }}
pip install --no-build-isolation --no-cache-dir --force-reinstall gdal==${{ matrix.gdal }}
- name: Create download location for Spark
shell: bash
run: |
Expand Down
6 changes: 3 additions & 3 deletions R/generate_R_bindings.R
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ main <- function(scala_file_path){
closeAllConnections()

# supplementary files
sparkr_supplementary_files <- c("sparkR-mosaic/enableMosaic.R")
sparkr_supplementary_files <- c("sparkR-mosaic/enableMosaic.R", "sparkR-mosaic/enableGDAL.R")
copy_supplementary_file(sparkr_supplementary_files, "sparkR-mosaic/sparkrMosaic/R")

##########################
Expand All @@ -226,8 +226,8 @@ main <- function(scala_file_path){
closeAllConnections()

# supplementary files
sparkr_supplementary_files <- c("sparklyr-mosaic/enableMosaic.R", "sparklyr-mosaic/sparkFunctions.R")
copy_supplementary_file(sparkr_supplementary_files, "sparklyr-mosaic/sparklyrMosaic/R/")
sparklyr_supplementary_files <- c("sparklyr-mosaic/enableMosaic.R", "sparklyr-mosaic/sparkFunctions.R", "sparklyr-mosaic/enableGDAL.R")
copy_supplementary_file(sparklyr_supplementary_files, "sparklyr-mosaic/sparklyrMosaic/R/")
}


Expand Down
2 changes: 1 addition & 1 deletion R/install_deps.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
options(repos = c(CRAN = "https://packagemanager.posit.co/cran/__linux__/jammy/latest"))

install.packages(c("pkgbuild", "testthat", "roxygen2", "sparklyr"))
install.packages(c("pkgbuild", "testthat", "roxygen2", "sparklyr", "readr", "sparklyr.nested"))
14 changes: 14 additions & 0 deletions R/sparkR-mosaic/enableGDAL.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#' enableGDAL
#'
#' @description enableGDAL activates GDAL extensions for Mosaic
#' @name enableGDAL
#' @rdname enableGDAL
#' @return None
#' @export enableGDAL
#' @examples
#' \dontrun{
#' enableGDAL() }
enableGDAL <- function(
){
sparkR.callJStatic(x="com.databricks.labs.mosaic.gdal.MosaicGDAL", methodName="enableGDAL", sparkR.session())
}
2 changes: 1 addition & 1 deletion R/sparkR-mosaic/sparkrMosaic/.Rbuildignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
^sparkrMosaic\.Rproj$
^\.Rproj\.user$
^\.Rproj\.user$
12 changes: 7 additions & 5 deletions R/sparkR-mosaic/sparkrMosaic/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
Package: sparkrMosaic
Title: SparkR bindings for Databricks Mosaic
Version: 0.4.0
Authors@R:
Authors@R:
person("Robert", "Whiffin", , "[email protected]", role = c("aut", "cre")
)
Description: This package extends SparkR to bring the Databricks Mosaic for geospatial processing APIs into SparkR.
License: Databricks
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.3
Collate:
Collate:
'enableGDAL.R'
'enableMosaic.R'
'generics.R'
'functions.R'
Imports:
SparkR,
methods
Suggests:
testthat (>= 3.0.0)
Config/testthat/edition: 3
Suggests:
testthat (>= 3.0.0),
readr (>= 2.1.5)
Config/testthat/edition: 3
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ LineEndingConversion: Posix
BuildType: Package
PackageUseDevtools: Yes
PackageInstallArgs: --no-multiarch --with-keep.source
PackageRoxygenize: rd,collate,namespace
PackageRoxygenize: rd,collate,namespace
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
inputGJ = '{
{
"type":"Feature",
"properties":{
"shape_area":"0.0000607235737749",
Expand Down Expand Up @@ -225,4 +225,4 @@ inputGJ = '{
]
]
}
}'
}
Binary file not shown.
140 changes: 140 additions & 0 deletions R/sparkR-mosaic/sparkrMosaic/tests/testthat/testRasterFunctions.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
generate_singleband_raster_df <- function() {
read.df(
path = "sparkrMosaic/tests/testthat/data/MCD43A4.A2018185.h10v07.006.2018194033728_B04.TIF",
source = "gdal",
raster.read.strategy = "in_memory"
)
}

test_that("mosaic can read single-band GeoTiff", {
sdf <- generate_singleband_raster_df()
row <- first(sdf)
expect_equal(row$length, 1067862L)
expect_equal(row$x_size, 2400)
expect_equal(row$y_size, 2400)
expect_equal(row$srid, 0)
expect_equal(row$bandCount, 1)
expect_equal(row$metadata[[1]]$LONGNAME, "MODIS/Terra+Aqua BRDF/Albedo Nadir BRDF-Adjusted Ref Daily L3 Global - 500m")
expect_equal(row$tile[[1]]$driver, "GTiff")

})

test_that("scalar raster functions behave as intended", {
sdf <- generate_singleband_raster_df()
sdf <- withColumn(sdf, "rst_rastertogridavg", rst_rastertogridavg(column("tile"), lit(9L)))
sdf <- withColumn(sdf, "rst_rastertogridcount", rst_rastertogridcount(column("tile"), lit(9L)))
sdf <- withColumn(sdf, "rst_rastertogridmax", rst_rastertogridmax(column("tile"), lit(9L)))
sdf <- withColumn(sdf, "rst_rastertogridmedian", rst_rastertogridmedian(column("tile"), lit(9L)))
sdf <- withColumn(sdf, "rst_rastertogridmin", rst_rastertogridmin(column("tile"), lit(9L)))
sdf <- withColumn(sdf, "rst_rastertoworldcoordx", rst_rastertoworldcoordx(column("tile"), lit(1200L), lit(1200L)))
sdf <- withColumn(sdf, "rst_rastertoworldcoordy", rst_rastertoworldcoordy(column("tile"), lit(1200L), lit(1200L)))
sdf <- withColumn(sdf, "rst_rastertoworldcoord", rst_rastertoworldcoord(column("tile"), lit(1200L), lit(1200L)))
sdf <- withColumn(sdf, "rst_rotation", rst_rotation(column("tile")))
sdf <- withColumn(sdf, "rst_scalex", rst_scalex(column("tile")))
sdf <- withColumn(sdf, "rst_scaley", rst_scaley(column("tile")))
sdf <- withColumn(sdf, "rst_srid", rst_srid(column("tile")))
sdf <- withColumn(sdf, "rst_summary", rst_summary(column("tile")))
sdf <- withColumn(sdf, "rst_upperleftx", rst_upperleftx(column("tile")))
sdf <- withColumn(sdf, "rst_upperlefty", rst_upperlefty(column("tile")))
sdf <- withColumn(sdf, "rst_width", rst_width(column("tile")))
sdf <- withColumn(sdf, "rst_worldtorastercoordx", rst_worldtorastercoordx(column("tile"), lit(0.0), lit(0.0)))
sdf <- withColumn(sdf, "rst_worldtorastercoordy", rst_worldtorastercoordy(column("tile"), lit(0.0), lit(0.0)))
sdf <- withColumn(sdf, "rst_worldtorastercoord", rst_worldtorastercoord(column("tile"), lit(0.0), lit(0.0)))

expect_no_error(write.df(sdf, source = "noop", mode = "overwrite"))
})

test_that("raster flatmap functions behave as intended", {
retiled_sdf <- generate_singleband_raster_df()
retiled_sdf <- withColumn(retiled_sdf, "rst_retile", rst_retile(column("tile"), lit(1200L), lit(1200L)))

expect_no_error(write.df(retiled_sdf, source = "noop", mode = "overwrite"))
expect_equal(nrow(retiled_sdf), 4)

subdivide_sdf <- generate_singleband_raster_df()
subdivide_sdf <- withColumn(subdivide_sdf, "rst_subdivide", rst_subdivide(column("tile"), lit(1L)))

expect_no_error(write.df(subdivide_sdf, source = "noop", mode = "overwrite"))
expect_equal(nrow(subdivide_sdf), 4)

tessellate_sdf <- generate_singleband_raster_df()
tessellate_sdf <- withColumn(tessellate_sdf, "rst_tessellate", rst_tessellate(column("tile"), lit(3L)))

expect_no_error(write.df(tessellate_sdf, source = "noop", mode = "overwrite"))
expect_equal(nrow(tessellate_sdf), 66)

overlap_sdf <- generate_singleband_raster_df()
overlap_sdf <- withColumn(overlap_sdf, "rst_to_overlapping_tiles", rst_to_overlapping_tiles(column("tile"), lit(200L), lit(200L), lit(10L)))

expect_no_error(write.df(overlap_sdf, source = "noop", mode = "overwrite"))
expect_equal(nrow(overlap_sdf), 87)
})

test_that("raster aggregation functions behave as intended", {
collection_sdf <- generate_singleband_raster_df()
collection_sdf <- withColumn(collection_sdf, "extent", st_astext(rst_boundingbox(column("tile"))))
collection_sdf <- withColumn(collection_sdf, "tile", rst_to_overlapping_tiles(column("tile"), lit(200L), lit(200L), lit(10L)))

merge_sdf <- summarize(
groupBy(collection_sdf, "path"),
alias(rst_merge_agg(column("tile")), "tile")
)
merge_sdf <- withColumn(merge_sdf, "extent", st_astext(rst_boundingbox(column("tile"))))

expect_equal(nrow(merge_sdf), 1)
expect_equal(first(collection_sdf)$extent, first(merge_sdf)$extent)

combine_avg_sdf <- summarize(
groupBy(collection_sdf, "path"),
alias(rst_combineavg_agg(column("tile")), "tile")
)
combine_avg_sdf <- withColumn(combine_avg_sdf, "extent", st_astext(rst_boundingbox(column("tile"))))

expect_equal(nrow(combine_avg_sdf), 1)
expect_equal(first(collection_sdf)$extent, first(combine_avg_sdf)$extent)

})

test_that("the tessellate-join-clip-merge flow works on NetCDF files", {
target_resolution <- 1L

region_keys <- c("NAME", "STATE", "BOROUGH", "BLOCK", "TRACT")

census_sdf <- read.df(
path = "sparkrMosaic/tests/testthat/data/Blocks2020.zip",
source = "com.databricks.labs.mosaic.datasource.OGRFileFormat",
vsizip = "true",
chunkSize = "20"
)

census_sdf <- select(census_sdf, c(region_keys, "geom_0", "geom_0_srid"))
census_sdf <- distinct(census_sdf)
census_sdf <- withColumn(census_sdf, "geom_0", st_simplify(column("geom_0"), lit(0.001)))
census_sdf <- withColumn(census_sdf, "geom_0", st_updatesrid(column("geom_0"), column("geom_0_srid"), lit(4326L)))
census_sdf <- withColumn(census_sdf, "chip", grid_tessellateexplode(column("geom_0"), lit(target_resolution)))
census_sdf <- select(census_sdf, c(region_keys, "chip.*"))

raster_sdf <- read.df(
path = "sparkrMosaic/tests/testthat/data/prAdjust_day_HadGEM2-CC_SMHI-DBSrev930-GFD-1981-2010-postproc_rcp45_r1i1p1_20201201-20201231.nc",
source = "gdal",
raster.read.strategy = "retile_on_read"
)

raster_sdf <- withColumn(raster_sdf, "tile", rst_separatebands(column("tile")))
raster_sdf <- withColumn(raster_sdf, "timestep", element_at(rst_metadata(column("tile")), "NC_GLOBAL#GDAL_MOSAIC_BAND_INDEX"))
raster_sdf <- where(raster_sdf, "timestep = 21")
raster_sdf <- withColumn(raster_sdf, "tile", rst_setsrid(column("tile"), lit(4326L)))
raster_sdf <- withColumn(raster_sdf, "tile", rst_to_overlapping_tiles(column("tile"), lit(20L), lit(20L), lit(10L)))
raster_sdf <- withColumn(raster_sdf, "tile", rst_tessellate(column("tile"), lit(target_resolution)))

clipped_sdf <- join(raster_sdf, census_sdf, raster_sdf$tile.index_id == census_sdf$index_id)
clipped_sdf <- withColumn(clipped_sdf, "tile", rst_clip(column("tile"), column("wkb")))

merged_precipitation <- summarize(
groupBy(clipped_sdf, "timestep"),
alias(rst_merge_agg(column("tile")), "tile")
)

expect_equal(nrow(merged_precipitation), 1)

})
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
source("data.R")

test_that("scalar vector functions behave as intended", {
sdf <- SparkR::createDataFrame(
sdf <- createDataFrame(
data.frame(
wkt = "POLYGON ((0 0, 0 2, 1 2, 1 0, 0 0))",
wkt = "POLYGON ((2 1, 1 2, 2 3, 2 1))",
point_wkt = "POINT (1 1)"
)
)
Expand Down Expand Up @@ -52,14 +50,16 @@ test_that("scalar vector functions behave as intended", {
sdf <- withColumn(sdf, "mosaic_explode", mosaic_explode(column("wkt"), lit(1L)))
sdf <- withColumn(sdf, "mosaicfill", mosaicfill(column("wkt"), lit(1L)))

expect_no_error(SparkR::write.df(sdf, source = "noop", mode = "overwrite"))
expect_no_error(write.df(sdf, source = "noop", mode = "overwrite"))
expect_equal(nrow(sdf), 1)

})

test_that("aggregate vector functions behave as intended", {

sdf <- SparkR::sql("SELECT id as location_id FROM range(1)")
sdf <- sql("SELECT id as location_id FROM range(1)")

inputGJ <- read_file("data/boroughs.geojson")
sdf <- withColumn(sdf, "geometry", st_geomfromgeojson(lit(inputGJ)))
expect_equal(nrow(sdf), 1)

Expand Down
4 changes: 3 additions & 1 deletion R/sparkR-mosaic/tests.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
options(warn = -1)
library(testthat)
library(readr)

spark_location <- Sys.getenv("SPARK_HOME")
library(SparkR, lib.loc = c(file.path(spark_location, "R", "lib")))
Expand All @@ -12,7 +14,7 @@ install.packages(package_file, repos=NULL)
library(sparkrMosaic)

# find the mosaic jar in staging
staging_dir <- "/home/runner/work/mosaic/mosaic/staging/"
staging_dir <- Sys.getenv("MOSAIC_LIB_PATH", "/home/runner/work/mosaic/mosaic/staging/")
mosaic_jar <- list.files(staging_dir)
mosaic_jar <- mosaic_jar[grep("jar-with-dependencies.jar", mosaic_jar, fixed=T)]
print("Looking for mosaic jar in")
Expand Down
17 changes: 17 additions & 0 deletions R/sparklyr-mosaic/enableGDAL.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#' enableGDAL
#'
#' @description enableGDAL activates GDAL extensions for Mosaic
#' @param sc sparkContext
#' @name enableGDAL
#' @rdname enableGDAL
#' @return None
#' @export enableGDAL
#' @examples
#' \dontrun{
#' enableGDAL(sc)}

enableGDAL <- function(
sc
){
sparklyr::invoke_static(sc, class="com.databricks.labs.mosaic.gdal.MosaicGDAL", method="enableGDAL", spark_session(sc))
}
2 changes: 1 addition & 1 deletion R/sparklyr-mosaic/sparklyrMosaic/.Rbuildignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
^sparkrMosaic\.Rproj$
^\.Rproj\.user$
^\.Rproj\.user$
15 changes: 9 additions & 6 deletions R/sparklyr-mosaic/sparklyrMosaic/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
Package: sparklyrMosaic
Title: sparklyr bindings for Databricks Mosaic
Version: 0.4.0
Authors@R:
Authors@R:
person("Robert", "Whiffin", , "[email protected]", role = c("aut", "cre")
)
Description: This package extends sparklyr to bring the Databricks Mosaic for geospatial processing APIs into sparklyr .
License: Databricks
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.3
Collate:
Collate:
'enableGDAL.R'
'enableMosaic.R'
'sparkFunctions.R'
'functions.R'
'functions.R'
Imports:
sparklyr
Suggests:
testthat (>= 3.0.0)
Config/testthat/edition: 3
Suggests:
testthat (>= 3.0.0),
sparklyr.nested (>= 0.0.4),
readr (>= 2.1.5)
Config/testthat/edition: 3
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
inputGJ = '{
{
"type":"Feature",
"properties":{
"shape_area":"0.0000607235737749",
Expand Down Expand Up @@ -225,4 +225,4 @@ inputGJ = '{
]
]
}
}'
}
Binary file not shown.
Loading

0 comments on commit a7ab70b

Please sign in to comment.