-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdask_singlenode.py
106 lines (78 loc) · 3.63 KB
/
dask_singlenode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""
An example Python script how to calculate NDVI for three Sentinel satellite images
in parallel with the dask and xarray library.
All the files are worked in parallel with the help of Dask delayed functions, see main()-function.
More info about Python Dask library can be found from:
https://docs.dask.org/en/latest/why.html
Author: Johannes Nyman, Samantha Wittke, CSC
"""
import os
import sys
import xarray as xr
import time
from dask import delayed
from dask import compute
### This import exists in a another python file called rasterio_to_xarray.py
### It is downloaded from here https://github.com/robintw/XArrayAndRasterio/blob/master/rasterio_to_xarray.py
from rasterio_to_xarray import xarray_to_rasterio
### Declare the folder with input sentinel SAFE folders and output folder
image_folder = sys.argv[1]
## Create a results folder to this location
output_folder = "output"
if not os.path.exists(output_folder):
os.makedirs(output_folder)
def readImage(image_folder_fp):
print("Reading Sentinel image from: %s" % (image_folder_fp))
### Rather than figuring out what the filepath inside SAFE folder is, this is just finding the red and nir files with correct endings
for subdir, dirs, files in os.walk(image_folder_fp):
for file in files:
if file.endswith("_B04_10m.jp2"):
red_fp = os.path.join(subdir,file)
if file.endswith("_B08_10m.jp2"):
nir_fp = os.path.join(subdir,file)
### Read the red and nir band files to xarray and with the chunk-option to dask
red = xr.open_rasterio(red_fp, chunks={'band': 1, 'x': 1024, 'y': 1024})
nir = xr.open_rasterio(nir_fp, chunks={'band': 1, 'x': 1024, 'y': 1024})
### Scale the image values back to real reflectance values
red = red /10000
nir = nir /10000
return red,nir
def calculateNDVI(red,nir):
print("Computing NDVI")
### This function calculates NDVI with xarray
## NDVI calculation for all pixels where red or nir != 0
ndvi = xr.where((nir ==0) & (red==0), 0, (nir - red) / (nir + red))
return ndvi
def processImage(image_folder_fp):
### This is the function that gets parallellized. This gathers all operations we do for one image
## Read image and get a list of opened bands
red,nir = readImage(image_folder_fp)
## Calculate NDVI and save the result file
ndvi = calculateNDVI(red,nir)
## Get image name and save image
image_name = os.path.basename(image_folder_fp)
saveImage(ndvi,image_name)
return image_name
def saveImage(ndvi,image_name):
## Create the output filename and save it with using a function xarray_to_rasterio from a separate python file
output_file = image_name.replace(".SAFE", "_NDVI.tif")
output_path = os.path.join(output_folder, output_file)
print("Saving image: %s" % output_path)
xarray_to_rasterio(ndvi, output_path)
def main():
## This list hosts the delayed functions which are then ran with compute()
list_of_delayed_functions = []
## Iterate through the Sentinel SAFE folders
for directory in os.listdir(image_folder):
folder_path = os.path.join(image_folder, directory)
if os.path.isdir(folder_path):
print(folder_path)
### add delayed processImage function for one image to a list
list_of_delayed_functions.append(delayed(processImage)(folder_path))
## After constructing the Dask graph of delayed functions, run them with the resources available
compute(list_of_delayed_functions)
if __name__ == '__main__':
start = time.time()
main()
end = time.time()
print("Script completed in " + str(end - start) + " seconds")