Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Moving closer to Dask #87

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 200 additions & 0 deletions rfc/0001-dask.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# Summary

Dask-geomodeling is using dask to compute derivatives of GIS datasets. It uses
its own ``RasterBlock`` and ``GeometryBlock`` classes to implement operations.
However, ``dask`` has ``dask.Array`` and ``dask.DataFrame``, which are much
more feature rich and are much better maintained. This document investigates
whether we can replace our own implementations with native dask functions.

# Motivation

We notice a lack of interest or understanding from the community in ``dask-geomodeling``.
While it does solve our use case well, we'd like to "hook on" to the community
work to be able to use the large body of work done by the community, and also to
contribute something back.

Also there are some minor quality concerns in dask-geomodeling which would be
solved by using dask more intimately.

# Dask

## Summary of dask inner workings

Dask is a flexible library for parallel computing in Python. Dask contains
chunked representations of arrays (``dask.Array``) and dataframes
(``dask.DataFrame``). These chunked representation of arrays are both
described by the term ``collection``, which is defined over [here](https://docs.dask.org/en/latest/custom-collections.html?highlight=collection#the-dask-collection-interface).

Fundamentally, a collection has the method ``__dask_graph__`` that returns a
``Mapping`` (e.g. ``dict``) representing the computation, like for example::

```
# add(inc(1), 10)
d = {'x': 1,
'y': (inc, 'x'),
'z': (add, 'y', 10)}
```

By nesting dask operations, one actually expands the Dask graph with additional
elements. In a collection (like ``dask.Array``) each chunk is represented by
an entry in the Dask graph, like so:

```
# a = da.from_array(h5py.File('data.h5')['data'], chunks=(256, 256))
{
'array-original-9d696618e0408fc49a3c1f8cd9fc449d': <HDF5 dataset "data": shape (2048, 2048), type "<f4">,
('array-9d696618e0408fc49a3c1f8cd9fc449d', 0, 0): (<function getter at 0x7fd0158875e0>, 'array-original-9d696618e0408fc49a3c1f8cd9fc449d', (slice(0, 256, None), slice(0, 256, None))),
('array-9d696618e0408fc49a3c1f8cd9fc449d', 0, 1): (<function getter at 0x7fd0158875e0>, 'array-original-9d696618e0408fc49a3c1f8cd9fc449d', (slice(0, 256, None), slice(256, 512, None))),
...
}
```

Because the chunks are (in general) not dependent on another the operations
on them may be computed in a parallel. This is what it is all about.

### Partitioning

The partitioning used directly affects the parallel computation. The partitioning
is defined at the dataset source (there are repartitioning methods, but I will
not go into those). If the partitions are too large, there is not much to parallelize.
If they are too small, the graph itself becomes very large and the overhead of Dask will kick in.

### Subsetting (slicing)

When subsetting a dask collection, this just adds an extra key to the Dask graph:

```
# a = da.from_array(h5py.File('data.h5')['data'], chunks=(256, 256))[:100, :100]
{
'array-original-9d696618e0408fc49a3c1f8cd9fc449d': <HDF5 dataset "data": shape (2048, 2048), type "<f4">,
('array-9d696618e0408fc49a3c1f8cd9fc449d', 0, 0): (<function getter at 0x7fd0158875e0>, 'array-original-9d696618e0408fc49a3c1f8cd9fc449d', (slice(0, 256, None), slice(0, 256, None))),
('array-9d696618e0408fc49a3c1f8cd9fc449d', 0, 1): (<function getter at 0x7fd0158875e0>, 'array-original-9d696618e0408fc49a3c1f8cd9fc449d', (slice(0, 256, None), slice(256, 512, None))),
...
('getitem-0e3c9567c48dd924f83f2db186dbf8f7', 0, 0): (<built-in function getitem>, ('array-dcdf53315ae307db7a324f3f431affaf', 0, 0), (slice(0, 10, 1), slice(0, 10, 1)))
}
```

It is however clear that most chunks are not necessary and can be discarded from
the task graph. In Dask, this optimization (``cull``) is done by default before starting
a computation. Note that still the full 256x256 chunk is loaded and then sliced.
This makes sense, as the data is probably also stored in chunks, it doesn't add
much to try to read subsets of chunks. Still, when for instance loading data
over the network, it would make sense to further optimize this.

There are currently efforts "pushing down" slices in dask before creating the
complete collection, but the current Dask version does not support that. It is
a complex problem to solve in general.

### Computing

The Dask graph contains all information necessary to compute a result. Given a
target name and a graph, Dask's computation methods are able to produce a
result. Elements in the graph that are not dependent on each other may be
computed in a parallelized way. There are multiple options for computing.
Notably, ``dask.distributed`` does this on a cluster. The computation itself
is out of scope of this document.

## Summary of dask-geomodeling inner workings

Dask-geomodeling is a collection of classes that are to be stacked together
to create configurations for on-the-fly operations on geographical maps.
Datasets are represented by ``Block`` subclasses. Each ``Block`` exposes the
``get_compute_graph()`` method which takes a number of arguments like as
follows and produces a Dask graph, like as follows::

```
from dask_geomodeling.raster import RasterFileSource
import dask
source = RasterFileSource('/path/to/geotiff')
request = {
"mode": "vals",
"bbox": (138000, 480000, 139000, 481000),
"projection": "epsg:28992",
"width": 256,
"height": 256
}
graph, name = source.get_compute_graph(**request)
data = dask.get(graph, [name])
```

The ``get_compute_graph()`` recusively calls through the stack of blocks. For
example, ``((source + 1) * 10).get_compute_graph()`` would first call the method
on ``Multiply``, then on ``Add``, and finally on ``RasterFileSource``. The end
result is a Dask graph that can be evaluated by dask.

### Comparison

| | dask | dask-geomodeling |
|--------------------|--------------------------|-------------------------------------|
| dataset interface | collection | Block |
| graph | ``__dask_graph__()`` | ``get_compute_graph()`` |
| graph construction | when building collection | when calling get_compute_graph() |
| data partitioning | at datasource | at the last block (``RasterTiler``) |
| slicing | cull the complete graph | slice before building the graph |

Why does dask-geomodeling have such a different design? The crucial diffences
are in the partitioning and slicing behaviour. In our appliations we often have
very large (1e6 x 1e6) datasets and only want very small (typically 256 x 256)
slices of them.

For such datasets, 5000x5000 is a reasonable partition size. This is 100 MB
of data. However, when requesting a typical 256x256 slice, you would need at
least to compute 100 MB of data! If on the other hand you would set the partition
size to the typical slice size (256x256), you would end up with 1e7 partitions.

Holding dictionaries of that size in memory (and optimizing them!) will certainly
deteriorate performance.

To solve this issue, dask-geomodeling only constructs the graph after the slice
is known. This different design lead to a whole different set of operations.
For ``dask.Array`` we have ``dask_geomodeling.RasterBlock``, etc.

## Dask - layers and HighLevelGraphs

In the recent years, dask abstracted the concept of ``__dask_graph__()`` and
added a ``__dask_layers__()`` to the collections. The ``__dask_layers__`` only
contains one entry per operation ("layer") and ``__dask_graph__()`` only lazily
evaluates to a dictionary (with all chunks in it). This is allows efficient
high-level optimizations without dealing with the potentially large Dask graph.

Pushing slices through these layers would solve the above issue. This topic is
receiving attention in Dask currently, but it appears to be complex to solve
in general.


# Ideas to use dask more directly

## No chunking

You can use dask without chunking the data source (that is, only having 1 chunk).
We could try to make some Rewrite Rule ([docs](https://docs.dask.org/en/latest/optimize.html#rewrite-rules))
that pushes a slice through the graph until it reaches the data source and then
change the data source so that it only reads the relevant pixels. It can either
operate on the layers on or the dask graph itself.
caspervdw marked this conversation as resolved.
Show resolved Hide resolved

This requires registering this "push-through" logic per supported function. This
looks worse than it is: most functions are elementwise so a slice can just be
swapped with it. For example:

```
b = (a + 1)[:10]
```

Is equivalent to:

```
b = a[:10] + 1
```

The per-operation logic is actually quite similar to the ``get_compute_graph`` method currently
implemented.

This "slice pushdown" has been receiving a lot of attention in Dask lately:
- [SQL predicate pushdown in dask.DataFrame, 2017](https://github.com/dask/dask/issues/1957)
- [Plan for high level graph optimizations, 2019](https://github.com/dask/dask/issues/5644)
- [Discussion: making slicing scale better](https://github.com/dask/dask/issues/5918)
- [Pushdown of slicing operator on arrays?, 2020](https://github.com/dask/dask/issues/6288)
- [Array slicing HighLevelGraph layer](https://github.com/dask/dask/pull/7655)

I am unsure of the high-level solutions given in dask will perform well enough
for our web-applications.