Groupby#

[1]:
import xarray
import climtas
import dask.array
import pandas
import numpy

Say we have daily input data for several years, that we want to convert to a daily mean climatology

[2]:
time = pandas.date_range('20010101', '20040101', freq='D', closed='left')

data = dask.array.random.random((len(time),50,100), chunks=(90,25,25))
lat = numpy.linspace(-90, 90, data.shape[1])
lon = numpy.linspace(-180, 180, data.shape[2], endpoint=False)

da = xarray.DataArray(data, coords=[('time', time), ('lat', lat), ('lon', lon)], name='temperature')
da
[2]:
<xarray.DataArray 'temperature' (time: 1095, lat: 50, lon: 100)>
dask.array<random_sample, shape=(1095, 50, 100), dtype=float64, chunksize=(90, 25, 25), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2001-01-01 2001-01-02 ... 2003-12-31
  * lat      (lat) float64 -90.0 -86.33 -82.65 -78.98 ... 78.98 82.65 86.33 90.0
  * lon      (lon) float64 -180.0 -176.4 -172.8 -169.2 ... 169.2 172.8 176.4

The Xarray way is to use xarray.DataArray.groupby, however that is an expensive function to run - we started with 104 tasks and 104 chunks in the Dask graph, and this has exploded to 23,464 tasks and 2920 chunks. For a large dataset this increase in chunk counts really bogs down Dask.

The reason for this is that with groupby Xarray will create a new output chunk for each individual day - you can see the chunk size of the output is now (1, 25, 25).

[3]:
da.groupby('time.dayofyear').mean()
[3]:
<xarray.DataArray 'temperature' (dayofyear: 365, lat: 50, lon: 100)>
dask.array<stack, shape=(365, 50, 100), dtype=float64, chunksize=(1, 25, 25), chunktype=numpy.ndarray>
Coordinates:
  * lat        (lat) float64 -90.0 -86.33 -82.65 -78.98 ... 82.65 86.33 90.0
  * lon        (lon) float64 -180.0 -176.4 -172.8 -169.2 ... 169.2 172.8 176.4
  * dayofyear  (dayofyear) int64 1 2 3 4 5 6 7 8 ... 359 360 361 362 363 364 365

climtas.blocked.blocked_groupby will as much as possible limit the number of chunks created/ It does this by reshaping the array, stacking individual years, then reducing over the new stacked axis rather than using Pandas indexing operations. It does however require the input data to be evenly spaced in time, which well-behaved datasets should be.

[4]:
climtas.blocked_groupby(da, time='dayofyear').mean()
[4]:
<xarray.DataArray 'stack-f4e41af3171d33521253e01e4a44f4a5' (dayofyear: 366, lat: 50, lon: 100)>
dask.array<mean_agg-aggregate, shape=(366, 50, 100), dtype=float64, chunksize=(80, 25, 25), chunktype=numpy.ndarray>
Coordinates:
  * lat        (lat) float64 -90.0 -86.33 -82.65 -78.98 ... 82.65 86.33 90.0
  * lon        (lon) float64 -180.0 -176.4 -172.8 -169.2 ... 169.2 172.8 176.4
  * dayofyear  (dayofyear) int64 1 2 3 4 5 6 7 8 ... 360 361 362 363 364 365 366