How to make use of Dask clusters
This notebook shows how to use Dask clusters to parallelize workflows. Parallelization is useful when running workflows that require more computing resources than are available locally (or here in your DeepESDL workspace). The methods presented in this notebook take care of setting up the cluster with a desired number of workers, installing the environment on the worker nodes, and performing the parallel computation. When a task is finished and you no longer need the resources, you shut down the cluster with a single line of code, and all resources are shut down.
If you are new to Dask, you can get an introduction here. Please also refer to the [DeepESDL documentation] (https://deepesdl.readthedocs.io/en/latest/guide/jupyterlab/) and visit the platform's website for further information!
Brockmann Consult, 2024
To run this notebook you need a python environment with:
- python=3.11
- xcube>=1.1.2
- coiled
- dask
- ipykernel
To create a custom environment, please refer to the documentation.
If your team has access, you have to execute the following line once in your terminal:
$ coiled login --account bc --token $COILED_TOKEN
You just need to do this on the very first usage.
from xcube.util.dask import new_cluster
from dask.distributed import Client
import os
# mandatory: Dask cluster service mirrors your currently selected environment and does not take the default environment
del os.environ['JUPYTER_IMAGE']
Set up Dask cluster¶
Before starting a cluster, think about a good cluster name as well as how many workers you expect to need for your task. The cluster name is helpful in case your Jupyter notebook crashes or you wish to connect to a running cluster from another notebook. It will then recognize that a cluster with the indicated name is already running and connect to it.
The following cell creates a cluster with two worker nodes using the default instance type (spot instances). For additional settings, please refer to the xcube documentation.
Whenever you start a cluster, please make sure to shut it down with cluster.shutdown()
once finished! Otherwise, your processing units will go unused.
# default instance type selected contains 4 cores
cluster = new_cluster(name='team_computing_monthly_means', n_workers=2)
╭─────────────────────────────── Coiled Cluster ───────────────────────────────╮ │ https://cloud.coiled.io/clusters/592165?account=bc │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭────────────── Overview ──────────────╮╭─────────── Configuration ────────────╮ │ ││ │ │ Name: team_computing_monthly_means ││ Region: eu-central-1 │ │ ││ │ │ Scheduler Status: started ││ Scheduler: m6i.xlarge │ │ ││ │ │ Dashboard: ││ Workers: m6i.xlarge (2) │ │ https://cluster-zppgt.dask.host?toke ││ │ │ n=gRqjkEMXnTB-t8k4 ││ Workers Requested: 2 │ │ ││ │ ╰──────────────────────────────────────╯╰──────────────────────────────────────╯ ╭───────────────────────── (2024/09/18 08:49:33 UTC) ──────────────────────────╮ │ │ │ All workers ready. │ │ │ │ │ ╰──────────────────────────────────────────────────────────────────────────────╯
Connect your newly started cluster to the Dask client:
# check cluster status
cluster.status
<Status.running: 'running'>
client = Client(cluster)
To test the cluster, we will create monthly means for a variable from the Earth System Data Cube provided by DeepESDL. To get more inspiration, when Dask may be useful, check out the example section.
Access Data and create monthly means¶
from xcube.core.store import new_data_store
store = new_data_store("s3", root="deep-esdl-public", storage_options=dict(anon=True))
store.list_data_ids()
['LC-1x2160x2160-1.0.0.levels', 'SMOS-L2C-OS-20230101-20231231-1W-res0-1x1000x1000.levels', 'SMOS-L2C-OS-20230101-20231231-1W-res0-53x120x120.zarr', 'SMOS-L2C-OS-20230101-20231231-1W-res0.zarr', 'SMOS-L2C-SM-20230101-20231231-1W-res0-1x1000x1000.levels', 'SMOS-L2C-SM-20230101-20231231-1W-res0-53x120x120.zarr', 'SMOS-L2C-SM-20230101-20231231-1W-res0.zarr', 'SMOS-freezethaw-1x720x720-1.0.1.zarr', 'SMOS-freezethaw-4267x10x10-1.0.1.zarr', 'SeasFireCube_v3.zarr', 'black-sea-1x1024x1024.levels', 'black-sea-256x128x128.zarr', 'esa-cci-permafrost-1x1151x1641-0.0.2.levels', 'esdc-8d-0.25deg-1x720x1440-3.0.1.zarr', 'esdc-8d-0.25deg-256x128x128-3.0.1.zarr', 'extrAIM-merged-cube-1x86x179.zarr', 'hydrology-1D-0.009deg-100x60x60-3.0.2.zarr', 'hydrology-1D-0.009deg-1418x70x76-2.0.0.zarr', 'hydrology-1D-0.009deg-1x1102x2415-2.0.0.levels', 'hydrology-1D-0.009deg-1x1102x966-3.0.2.levels', 'ocean-1M-9km-1x1080x1080-1.4.0.levels', 'ocean-1M-9km-64x256x256-1.4.0.zarr', 'polar-100m-1x2048x2048-1.0.1.zarr']
We select a cube, that contains 256 timestamps in one chunk.
dataset = store.open_data('esdc-8d-0.25deg-256x128x128-3.0.1.zarr')
dataset
<xarray.Dataset> Size: 353GB Dimensions: (time: 1978, lat: 720, lon: 1440) Coordinates: * lat (lat) float64 6kB -89.88 -89.62 ... 89.88 * lon (lon) float64 12kB -179.9 ... 179.9 * time (time) datetime64[ns] 16kB 1979-01-05 ... Data variables: (12/42) aerosol_optical_thickness_550 (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> air_temperature_2m (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> bare_soil_evaporation (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> burnt_area (time, lat, lon) float64 16GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> cot (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> cth (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> ... ... sif_rtsif (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> sm (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> snow_sublimation (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> surface_moisture (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> terrestrial_ecosystem_respiration (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> transpiration (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> Attributes: (12/23) Conventions: CF-1.9 acknowledgment: All ESDC data providers are acknowledged insi... contributor_name: ['University of Leipzig', 'Max Planck Institu... contributor_url: ['https://www.uni-leipzig.de/', 'https://www.... creator_name: ['University of Leipzig', 'Brockmann Consult ... creator_url: ['https://www.uni-leipzig.de/', 'https://www.... ... ... publisher_url: https://www.earthsystemdatalab.net/ time_coverage_end: 2021-12-31T00:00:00.000000000 time_coverage_start: 1979-01-05T00:00:00.000000000 time_period: 8D time_period_reported_day: 5.0 title: Earth System Data Cube (ESDC) v3.0.1
Select a variable, for which you would like to calculate monthly means. In this example, we select air_temperature_2m.
air_dataset = dataset[['air_temperature_2m']]
air_dataset
<xarray.Dataset> Size: 8GB Dimensions: (time: 1978, lat: 720, lon: 1440) Coordinates: * lat (lat) float64 6kB -89.88 -89.62 -89.38 ... 89.62 89.88 * lon (lon) float64 12kB -179.9 -179.6 -179.4 ... 179.6 179.9 * time (time) datetime64[ns] 16kB 1979-01-05 ... 2021-12-31 Data variables: air_temperature_2m (time, lat, lon) float32 8GB dask.array<chunksize=(256, 128, 128), meta=np.ndarray> Attributes: (12/23) Conventions: CF-1.9 acknowledgment: All ESDC data providers are acknowledged insi... contributor_name: ['University of Leipzig', 'Max Planck Institu... contributor_url: ['https://www.uni-leipzig.de/', 'https://www.... creator_name: ['University of Leipzig', 'Brockmann Consult ... creator_url: ['https://www.uni-leipzig.de/', 'https://www.... ... ... publisher_url: https://www.earthsystemdatalab.net/ time_coverage_end: 2021-12-31T00:00:00.000000000 time_coverage_start: 1979-01-05T00:00:00.000000000 time_period: 8D time_period_reported_day: 5.0 title: Earth System Data Cube (ESDC) v3.0.1
Next, we will specify how to resample the dataset. It is important to note that this is a lazy operation. After executing the next cell, the air_dataset_monthly
should only contain one value for each month in its metadata and structure, even though the data has not yet been resampled.
air_dataset_monthly = air_dataset.resample({'time':'1ME'}).mean()
air_dataset_monthly
<xarray.Dataset> Size: 2GB Dimensions: (time: 516, lat: 720, lon: 1440) Coordinates: * lat (lat) float64 6kB -89.88 -89.62 -89.38 ... 89.62 89.88 * lon (lon) float64 12kB -179.9 -179.6 -179.4 ... 179.6 179.9 * time (time) datetime64[ns] 4kB 1979-01-31 ... 2021-12-31 Data variables: air_temperature_2m (time, lat, lon) float32 2GB dask.array<chunksize=(1, 128, 128), meta=np.ndarray> Attributes: (12/23) Conventions: CF-1.9 acknowledgment: All ESDC data providers are acknowledged insi... contributor_name: ['University of Leipzig', 'Max Planck Institu... contributor_url: ['https://www.uni-leipzig.de/', 'https://www.... creator_name: ['University of Leipzig', 'Brockmann Consult ... creator_url: ['https://www.uni-leipzig.de/', 'https://www.... ... ... publisher_url: https://www.earthsystemdatalab.net/ time_coverage_end: 2021-12-31T00:00:00.000000000 time_coverage_start: 1979-01-05T00:00:00.000000000 time_period: 8D time_period_reported_day: 5.0 title: Earth System Data Cube (ESDC) v3.0.1
Next, in order to use the data for further analysis, the monthly resampling needs to be run. The following command will take some time.
computed_air_dataset_monthly = air_dataset_monthly.compute()
computed_air_dataset_monthly
<xarray.Dataset> Size: 2GB Dimensions: (time: 516, lat: 720, lon: 1440) Coordinates: * lat (lat) float64 6kB -89.88 -89.62 -89.38 ... 89.62 89.88 * lon (lon) float64 12kB -179.9 -179.6 -179.4 ... 179.6 179.9 * time (time) datetime64[ns] 4kB 1979-01-31 ... 2021-12-31 Data variables: air_temperature_2m (time, lat, lon) float32 2GB -30.6 -30.6 ... -23.28 Attributes: (12/23) Conventions: CF-1.9 acknowledgment: All ESDC data providers are acknowledged insi... contributor_name: ['University of Leipzig', 'Max Planck Institu... contributor_url: ['https://www.uni-leipzig.de/', 'https://www.... creator_name: ['University of Leipzig', 'Brockmann Consult ... creator_url: ['https://www.uni-leipzig.de/', 'https://www.... ... ... publisher_url: https://www.earthsystemdatalab.net/ time_coverage_end: 2021-12-31T00:00:00.000000000 time_coverage_start: 1979-01-05T00:00:00.000000000 time_period: 8D time_period_reported_day: 5.0 title: Earth System Data Cube (ESDC) v3.0.1
Shut down Dask cluster¶
Once you are finished with your computations, make sure to shutdown the cluster to minimize costs.
cluster.shutdown()
Note, resampling by month, will always put the date for the month on the last day of the month per default.
computed_air_dataset_monthly.air_temperature_2m.isel(time=1).plot.imshow()
<matplotlib.image.AxesImage at 0x7f4d2265aed0>
If you wish to persist the dataset to your team storage, please checkout the example notebook 05 SAVE CUBE TO TEAM STORAGE.