#!/usr/bin/env python
# Copyright 2020 ARC Centre of Excellence for Climate Extremes
# author: Scott Wales <scott.wales@unimelb.edu.au>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functions for locating and analysing 'events' within a dataset
Locate where events are with :func:`find_events`, then analyse them with
:func:`map_events()` to create a :class:`pandas.DataFrame`.
"""
from dask.dataframe.core import new_dd_object
from dask.delayed import tokenize
import numpy
import dask
import dask.dataframe
import pandas
import xarray
import typing as T
import sparse
from .helpers import (
array_blocks_to_dataframe,
locate_block_in_dataarray,
map_blocks_array_to_dataframe,
map_blocks_to_delayed,
)
import time
[docs]def find_events(
da: xarray.DataArray,
min_duration: int = 1,
use_dask: bool = None,
compute: bool = True,
) -> pandas.DataFrame:
"""Find 'events' in a DataArray mask
Events are defined as being active when the array value is truthy. You
should generally pass in the results of a comparison against some kind of
threshold
>>> da = xarray.DataArray([0,1,1,1,0,1,1], dims=['time'])
>>> find_events(da > 0)
time event_duration
0 1 3
1 5 2
It's assumed that events are reasonably sparse for large arrays
If `use_dask` is True or `use_dask` is None and the source dataset is only chunked
horizontally then events will be searched for in each Dask chunk and the
results aggregated afterward.
Args:
da (:class:`xarray.DataArray`): Input mask, valid when an event is
active. Must have a 'time' dimension, dtype is expected to be bool
(or something else that is truthy when an event is active)
min_duration (:class:`int`): Minimum event duration to return
use_dask (:class:`bool`): Enable Dask parallelism (default True if da
is chunked)
compute (:class:`bool`): Compute the dask operations. Note that if
False the dataframe index will not have unique values
Returns:
A :class:`pandas.DataFrame` containing event start points and
durations. This will contain columns for each dimension in da, as well
as an 'event_duration' column
"""
chunks = getattr(da, "chunks", None)
# if use_dask is None and chunks is not None:
# # Decide whether to apply on blocks
# time_ax = da.get_axis_num("time")
# if len(chunks[time_ax]) > 1:
# use_dask = False
if chunks is None or use_dask is False:
events = find_events_block(
da, min_duration=min_duration, offset=(0,) * da.ndim, load=False
)
else:
# Map function to each dask block
data = da.data
def find_events_block_helper(
block, name, dims, coords, min_duration, block_info=None
):
da = locate_block_in_dataarray(block, name, dims, coords, block_info[0])
offset = [off[0] for off in block_info[0]["array-location"]]
return find_events_block(da, offset, min_duration)
block_events = data.map_blocks(
find_events_block_helper,
da.name,
da.dims,
da.coords,
min_duration,
meta=numpy.array((), dtype="i"),
)
def join_events_chunk(block, axis, keepdims):
return block
def join_events_combine(blocks, axis, keepdims):
return join_events(blocks)
def join_events_aggregate(blocks, axis, keepdims):
e = join_events(blocks)
return e[e.event_duration >= min_duration]
joined_events = dask.array.reduction(
block_events,
join_events_chunk,
join_events_aggregate,
combine=join_events_combine,
axis=da.get_axis_num("time"),
dtype="i",
concatenate=False,
meta=numpy.array((), dtype="i"),
)
meta = pandas.DataFrame()
meta["time"] = pandas.Series([], dtype="i")
for d in da.dims:
if d != "time":
meta[d] = pandas.Series([], dtype="i")
meta["event_duration"] = pandas.Series([], dtype="i")
events = array_blocks_to_dataframe(joined_events, meta)
if compute:
events = events.compute()
events.reset_index(drop=True, inplace=True)
# Final cleanup of events at the start/end of chunks
events = events[events.event_duration >= min_duration]
return events
[docs]def find_events_block(
da: xarray.DataArray,
offset: T.Iterable[int],
min_duration: int = 1,
load: bool = True,
) -> pandas.DataFrame:
"""Find 'events' in a section of a DataArray
See :func:`find_events`
Intended to run on a Dask block, with the results from each block merged.
If an event is active at the first or last timestep it is returned
regardless of its duration, so it can be joined with neighbours
"""
if load:
da = da.load()
duration = numpy.atleast_1d(numpy.zeros_like(da.isel(time=0), dtype="i4"))
columns = ["time", *[d for d in da.dims if d != "time"], "event_duration"]
records = []
def add_events(locations):
end_locations = numpy.nonzero(locations)
end_durations = duration[end_locations]
start_times = t - end_durations
# Reset events that have ended
duration[end_locations] = 0
if len(end_durations) == 0:
return
if len(columns) == 2:
# 1d input dataset
data = numpy.stack([start_times, end_durations], axis=1)
else:
data = numpy.concatenate(
[start_times[None, :], end_locations, end_durations[None, :]], axis=0
).T
df = pandas.DataFrame(data=data, columns=columns)
# Add an event if:
# * duration >= min_duration
# * start_times == 0
# * t == da.sizes['time']
if t == da.sizes["time"]:
records.append(df)
else:
records.append(
df[numpy.logical_or(df.event_duration >= min_duration, df.time == 0)]
)
t = 0
for t in range(da.sizes["time"]):
axis = da.get_axis_num("time")
assert isinstance(axis, int)
current_step = dask.array.atleast_1d(numpy.take(da.data, t, axis=axis))
try:
current_step = current_step.compute()
except:
pass
# Add the current step
duration = duration + numpy.where(current_step, 1, 0)
# End points are where we have an active duration but no event in the current step
add_events(numpy.logical_and(duration > 0, numpy.logical_not(current_step)))
# Add events still active at the end
t += 1
add_events(duration > 0)
if len(records) > 0:
result = pandas.concat(records, ignore_index=True)
else:
result = pandas.DataFrame(None, columns=columns, dtype="int64")
for d, off in zip(da.dims, offset):
result[d] += off
return result
[docs]def map_events(
da: xarray.DataArray, events: pandas.DataFrame, func, *args, **kwargs
) -> pandas.DataFrame:
"""Map a function against multiple events
The output is the value from func evaluated at each of the events. Events
should at a minimum have columns for each coordinate in da as well as an
'event_duration' column that records how long each event is, as is returned by
:func:`find_events`:
>>> da = xarray.DataArray([0,1,1,1,0,1,1], dims=['time'])
>>> events = find_events(da > 0)
>>> map_events(da, events, lambda x: x.sum().item())
0 3
1 2
dtype: int64
You may wish to filter the events DataFrame first to combine close events or to
remove very short events.
If func returns a dict results will be converted into columns. This will be
more efficient than running map_events once for each operation:
>>> da = xarray.DataArray([0,1,1,1,0,1,1], dims=['time'])
>>> events = find_events(da > 0)
>>> map_events(da, events, lambda x: {'mean': x.mean().item(), 'std': x.std().item()})
mean std
0 1.0 0.0
1 1.0 0.0
:meth:`pandas.DataFrame.join` can be used to link up the results with their
corresponding coordinates:
>>> da = xarray.DataArray([0,1,1,1,0,1,1], dims=['time'])
>>> events = find_events(da > 0)
>>> sums = map_events(da, events, lambda x: {'sum': x.sum().item()})
>>> events.join(sums)
time event_duration sum
0 1 3 3
1 5 2 2
Args:
da (:class:`xarray.DataArray`): Source data values
events (:class:`pandas.DataFrame`): Event start & durations, e.g. from
:func:`find_events`
func ((:class:`xarray.DataArray`, \*args, \*\*kwargs) -> Dict[str, Any]): Function to apply to each event
\*args, \*\*kwargs: Passed to func
Returns:
:class:`pandas.DataFrame` with each row the result of applying func to
the corresponding event row. Behaves like
:meth:`pandas.DataFrame.apply` with result_type='expand'
"""
def map_func(e):
coords = {k: e.loc[k] for k in da.dims}
coords["time"] = slice(coords["time"], coords["time"] + e["event_duration"])
values = da.isel(coords)
return func(values, *args, **kwargs)
return events.apply(map_func, axis="columns", result_type="expand")
[docs]def atleastn(da: xarray.DataArray, n: int, dim: str = "time") -> xarray.DataArray:
"""
Filter to return values with at least n contiguous points around them
>>> da = xarray.DataArray([0,1.4,0.8,1,-0.1,2.9,0.6], dims=['time'])
>>> atleastn(da.where(da > 0), 3)
<xarray.DataArray (time: 7)>
array([nan, 1.4, 0.8, 1. , nan, nan, nan])
Dimensions without coordinates: time
Args:
da (:class:`xarray.DataArray`): Pre-filtered event values
n (:class:`int`): Minimum event length
dim (:class:`str`): Dimension to work on
Returns:
:class:`xarray.DataArray` with events from da that are longer than n
along dimension dim
"""
def atleastn_helper(array, axis, n):
axis = axis[0]
count = numpy.zeros_like(numpy.take(array, 0, axis=axis), dtype="i4")
mask = numpy.empty_like(numpy.take(array, 0, axis=axis), dtype="bool")
mask = True
for i in range(array.shape[axis]):
array_slice = numpy.take(array, i, axis=axis)
# Increase the count when there is a valid value, reset when there is not
count = numpy.where(numpy.isfinite(array_slice), count + 1, 0)
# Add new points when the contiguous count exceeds the threshold
mask = numpy.where(count >= n, False, mask)
out_slice = numpy.take(array, array.shape[axis] // 2, axis=axis)
r = numpy.where(mask, numpy.nan, out_slice)
return r
def atleastn_dask_helper(array, axis, n):
r = dask.array.map_blocks(
atleastn_helper, array, drop_axis=axis, axis=axis, n=n, dtype=array.dtype
)
return r
if isinstance(da.data, dask.array.Array):
reducer = atleastn_dask_helper
else:
reducer = atleastn_helper
r = da.rolling({dim: n * 2 - 1}, center=True, min_periods=1).reduce(reducer, n=n)
return r
[docs]def event_coords(da: xarray.DataArray, events: pandas.DataFrame) -> pandas.DataFrame:
"""
Converts the index values returned by :func:`find_events` to coordinate values
>>> da = xarray.DataArray([0,1,1,1,0,1,1], coords=[('time', pandas.date_range('20010101', periods=7, freq='D'))])
>>> events = find_events(da > 0)
>>> event_coords(da, events)
time event_duration
0 2001-01-02 3 days
1 2001-01-06 NaT
If 'events' has an 'event_duration' column this will be converted to a time
duration. If the event goes to the end of the data the duration is marked
as not a time, as the end date is unknown.
Args:
da (:class:`xarray.DataArray`): Source data values
events (:class:`pandas.DataFrame`): Event start & durations, e.g. from
:func:`find_events` or :func:`extend_events`
Returns:
:class:`pandas.DataFrame` with the same columns as 'events', but with
index values converted to coordinates
"""
coords = {}
for d in da.dims:
coords[d] = da[d].values[events[d].values]
if "event_duration" in events:
end_index = events["time"].values + events["event_duration"].values
end = da["time"].values[numpy.clip(end_index, 0, da.sizes["time"] - 1)]
coords["event_duration"] = end - coords["time"]
coords["event_duration"][end_index >= da.sizes["time"]] = numpy.timedelta64(
"nat"
)
return pandas.DataFrame(coords, index=events.index)
[docs]def extend_events(events: pandas.DataFrame):
"""
Extend the 'events' DataFrame to hold indices for the full event duration
:func:`find_events` returns only the start index of events. This will
extend the DataFrame to cover the indices of the entire event. In addition
to the indices a column 'event_id' gives the matching index in 'event' for
the row
>>> da = xarray.DataArray([0,1,1,1,0,1,1], coords=[('time', pandas.date_range('20010101', periods=7, freq='D'))])
>>> events = find_events(da > 0)
>>> extend_events(events)
time event_id
0 1 0
1 2 0
2 3 0
3 5 1
4 6 1
Args:
da (:class:`xarray.DataArray`): Source data values
events (:class:`pandas.DataFrame`): Event start & durations, e.g. from
:func:`find_events`
"""
def extend_row(row):
repeat = numpy.repeat(row.values[None, :], row["event_duration"], axis=0)
df = pandas.DataFrame(repeat, columns=row.index)
df["time"] = row["time"] + numpy.arange(row["event_duration"])
df["event_id"] = row.name
del df["event_duration"]
return df
return pandas.concat(
events.apply(extend_row, axis="columns").values, ignore_index=True
)
[docs]def event_da(
da: xarray.DataArray, events: pandas.DataFrame, values: numpy.ndarray
) -> xarray.DataArray:
"""
Create a :obj:`xarray.DataArray` with 'values' at event locations
Args:
da (:class:`xarray.DataArray`): Source data values
events (:class:`pandas.DataFrame`): Index values, e.g. from
:func:`find_events` or :func:`extend_events`
values (:class:`numpy.ndarray`-like): Value to give to each location specified by event
Returns:
:obj:`xarray.DataArray` with the same axes as `da` and values at
`event` given by `values`.g
"""
s = sparse.COO(
[events[d] for d in da.dims], values, shape=da.shape, fill_value=numpy.nan
)
# It's more helpful to have a dense array, but let's only convert
# what we need when we need it using dask
# Add dask chunking
try:
chunks = da.data.chunks
except AttributeError:
chunks = "auto"
d = dask.array.from_array(s, chunks=chunks)
# Add an operation that converts chunks to dense when needed
def dense_block(b):
return b.todense()
dense = d.map_blocks(dense_block, dtype=d.dtype)
# Use the input data's coordinates
return xarray.DataArray(dense, coords=da.coords)
[docs]def join_events(
events: T.List[pandas.DataFrame],
offsets: T.List[T.List[int]] = None,
dims: T.Tuple[T.Hashable, ...] = None,
) -> pandas.DataFrame:
"""
Join consecutive events in multiple dataframes
The returned events will be in an arbitrary order, the index may not match
entries from the input dataframes.
>>> events = [
... pandas.DataFrame([[1, 2]], columns=["time", "event_duration"]),
... pandas.DataFrame([[3, 1]], columns=["time", "event_duration"]),
... ]
>>> join_events(events)
time event_duration
0 1 3
Args:
events: List of results from :func:`find_events`
Returns:
:obj:`pandas.DataFrame` where results that end when the next event
starts are joined together
"""
if offsets is not None:
# Join like offsets
if dims is None:
raise Exception("Must supply dims with offsets")
df = pandas.DataFrame(offsets, columns=dims)
df["events"] = events
results = []
spatial_dims = [d for d in dims if d != "time"]
for k, g in df.groupby(spatial_dims, sort=False):
results.append(dask.delayed(join_events)(g.events.values))
results = dask.compute(results)[0]
return pandas.concat(results, ignore_index=True)
if isinstance(events, pandas.DataFrame):
events = [events]
if len(events) == 0:
return None
if len(events) == 1:
return events[0]
results = []
for i in range(0, len(events) - 1):
next_t0 = events[i + 1]["time"].min()
if numpy.isnan(next_t0):
results.append(events[i])
continue
stop = events[i]["time"] + events[i]["event_duration"]
results.append(events[i][stop < next_t0])
events[i + 1] = _merge_join(events[i][stop >= next_t0], events[i + 1])
results.append(events[-1])
return pandas.concat(results)
def _merge_join(df_x: pandas.DataFrame, df_y: pandas.DataFrame) -> pandas.DataFrame:
"""
Join events on time == time + event_duration
"""
spatial_dims = list(df_x.columns[1:-1])
df_x = df_x.copy()
df_x["stop"] = df_x["time"] + df_x["event_duration"]
# First step of df_y
t0 = df_y["time"].min()
# Merge df_x with the first step from df_y
merged = df_x.merge(
df_y[df_y["time"] == t0],
left_on=["stop", *spatial_dims],
right_on=["time", *spatial_dims],
how="outer",
)
merged["time_x"].fillna(merged["time_y"], inplace=True, downcast="infer")
merged["event_duration_x"].fillna(0, inplace=True, downcast="infer")
merged["event_duration_y"].fillna(0, inplace=True, downcast="infer")
result = pandas.DataFrame()
result["time"] = merged["time_x"]
for d in spatial_dims:
result[d] = merged[d]
result["event_duration"] = merged["event_duration_x"] + merged["event_duration_y"]
return pandas.concat([result, df_y[df_y["time"] > t0]], ignore_index=True)
[docs]def event_values(
da: xarray.DataArray, events: pandas.DataFrame, use_dask=True
) -> dask.dataframe.DataFrame:
"""
Gets the values from da where an event is active
Note that for a large dataset with many events this can consume a
considerable amount of memory. `use_dask=True` will return an uncomputed
:obj:`dask.dataframe.DataFrame` instead of a :obj:`pandas.DataFrame`,
potentially saving memory. You can compute the results later with
:meth:`dask.dataframe.DataFrame.compute()`.
>>> da = xarray.DataArray(
... [0,3,6,2,0,8,6],
... coords=[('time',
... pandas.date_range('20010101', periods=7, freq='D'))])
>>> events = find_events(da > 0)
>>> event_values(da, events)
time event_id value
0 1 0 3
1 2 0 6
2 3 0 2
3 5 1 8
4 6 1 6
See the Dask DataFrame documentation for performance information. In general basic
reductions (min, mean, max, etc.) should be fast.
>>> values = event_values(da, events)
>>> values.groupby('event_id').value.mean()
event_id
0 3.666667
1 7.000000
Name: value, dtype: float64
For custom aggregations setting an index may help performance. This sorts the data though,
so may use a lot of memory
>>> values = values.set_index('event_id')
>>> values.groupby('event_id').value.apply(lambda x: x.min())
event_id
0 2
1 6
Name: value, dtype: int64
Args:
da (:class:`xarray.DataArray`): Source data values
events (:class:`pandas.DataFrame`): Event start & durations, e.g. from
:func:`find_events`
use_dask (bool): If true, returns an uncomputed Dask DataFrame. If false,
computes the values before returning
Returns:
:obj:`dask.dataframe.DataFrame` with columns event_id, time, value
"""
def event_values_helper(a, name, dims, coords, events, block_info=None):
da = locate_block_in_dataarray(a, name, dims, coords, block_info[0])
offset = [off[0] for off in block_info[0]["array-location"]]
return event_values_block(da, events, offset)
meta = pandas.DataFrame(
{
"time": pandas.Series([], dtype="i"),
"event_id": pandas.Series([], dtype="i"),
"value": pandas.Series([], dtype=da.dtype),
}
)
if getattr(da, "chunks", None) is None:
return event_values_block(da, events, [0] * da.ndim)
values = da.data.map_blocks(
event_values_helper,
da.name,
da.dims,
da.coords,
events=events,
meta=numpy.array((), dtype="i"),
)
df = array_blocks_to_dataframe(values, meta)
return df
[docs]def event_values_block(
da: xarray.DataArray,
events: pandas.DataFrame,
offset: T.Union[T.Iterable[int], T.Dict[T.Hashable, int]],
load: bool = True,
):
"""
Gets the values from da where an event is active
"""
if load:
da = da.load()
event_id = numpy.atleast_1d(-1 * numpy.ones(da.isel(time=0).shape, dtype="i"))
event_duration = numpy.atleast_1d(-1 * numpy.ones(da.isel(time=0).shape, dtype="i"))
times = []
event_ids = []
event_values = []
if not isinstance(offset, dict):
offset = dict(zip(da.dims, offset))
t_offset = offset["time"]
events = filter_block(da, events, offset)
# Events that started before this block
active_events = events[
(events["time"] < t_offset)
& (events["time"] + events["event_duration"] >= t_offset)
]
if active_events.size > 0:
indices = tuple(
[active_events[c] - offset[c] for c in active_events.columns[1:-1]]
)
event_id[indices] = active_events.index.values
event_duration[indices] = (
active_events.time + active_events.event_duration - t_offset
)
for t in range(da.sizes["time"]):
# Add newly active events
active_events = events[events["time"] == t + t_offset]
if active_events.size > 0:
# Indices in the block of current events
indices = tuple(
[
active_events[c].values - offset[c]
for c in active_events.columns[1:-1]
]
)
# Set values at the current events
event_id[indices] = active_events.index.values
event_duration[indices] = active_events.event_duration
# Currently active coordinates
indices = numpy.where(event_duration > 0)
# Get values
event_ids.append(event_id[indices])
event_values.append(numpy.atleast_1d(da.isel(time=t).values)[indices])
times.append(numpy.ones_like(event_ids[-1]) * (t + t_offset))
# Decrease durations
event_duration -= 1
if len(times) == 0:
return None
df = pandas.DataFrame(
{
"time": numpy.concatenate(times),
"event_id": numpy.concatenate(event_ids),
"value": numpy.concatenate(event_values),
}
)
return df
[docs]def filter_block(
da: xarray.DataArray, events: pandas.DataFrame, offset: T.Dict[T.Hashable, int]
) -> pandas.DataFrame:
"""
Filters events to within the current block horizontally
"""
for i, d in enumerate(da.dims):
if d != "time":
events = events[
(offset[d] <= events[d]) & (events[d] < offset[d] + da.shape[i])
]
return events