diff --git a/rfc/0001-dask.md b/rfc/0001-dask.md new file mode 100644 index 0000000..6e44594 --- /dev/null +++ b/rfc/0001-dask.md @@ -0,0 +1,248 @@ +# Summary + +Dask-geomodeling is using dask to compute derivatives of GIS datasets. It uses +its own ``RasterBlock`` and ``GeometryBlock`` classes to implement operations. +``dask`` has ``dask.Array`` and ``dask.DataFrame``, which are much +more feature rich and better maintained. Also, `dask-geopandas` +already implements parallelized versions of geometry operations. +This document investigates whether we can replace our own implementations with +native dask(-geopandas) functions. + +# Motivation + +We notice a lack of interest from the community in ``dask-geomodeling``. +While it does solve our use case well, we want to have a community-supported +package. Our resources are limited and we need broader usage and third-party +maintenance to sustain a high quality and feature-rich package. + +# Dask + +## Summary of dask inner workings + +Dask is a flexible library for parallel computing in Python. Dask contains +chunked representations of arrays (``dask.Array``) and dataframes +(``dask.DataFrame``). These chunked representation of arrays are both +described by the term ``collection``. + +Fundamentally, a collection has the method ``__dask_graph__`` that returns a +``Mapping`` (e.g. ``dict``) representing the computation, like for example:: + +``` +# add(inc(1), 10) +d = {'x': 1, + 'y': (inc, 'x'), + 'z': (add, 'y', 10)} +``` + +By nesting dask operations, one actually expands the Dask graph with additional +elements. In a collection (like ``dask.Array``) each chunk is represented by +an entry in the Dask graph, like so: + +``` +# a = da.from_array(h5py.File('data.h5')['data'], chunks=(256, 256)) +{ + 'array-original-9d696618e0408fc49a3c1f8cd9fc449d': , + ('array-9d696618e0408fc49a3c1f8cd9fc449d', 0, 0): (, 'array-original-9d696618e0408fc49a3c1f8cd9fc449d', (slice(0, 256, None), slice(0, 256, None))), + ('array-9d696618e0408fc49a3c1f8cd9fc449d', 0, 1): (, 'array-original-9d696618e0408fc49a3c1f8cd9fc449d', (slice(0, 256, None), slice(256, 512, None))), + ... +} +``` + +Because the chunks are (in general) not dependent on another the operations +on them may be computed in a parallel. + +### Subsetting (slicing) + +When subsetting a dask collection, this just adds an extra key to the Dask graph: + +``` +# a = da.from_array(h5py.File('data.h5')['data'], chunks=(256, 256))[:100, :100] +{ + 'array-original-9d696618e0408fc49a3c1f8cd9fc449d': , + ('array-9d696618e0408fc49a3c1f8cd9fc449d', 0, 0): (, 'array-original-9d696618e0408fc49a3c1f8cd9fc449d', (slice(0, 256, None), slice(0, 256, None))), + ('array-9d696618e0408fc49a3c1f8cd9fc449d', 0, 1): (, 'array-original-9d696618e0408fc49a3c1f8cd9fc449d', (slice(0, 256, None), slice(256, 512, None))), + ... + ('getitem-0e3c9567c48dd924f83f2db186dbf8f7', 0, 0): (, ('array-dcdf53315ae307db7a324f3f431affaf', 0, 0), (slice(0, 10, 1), slice(0, 10, 1))) +} +``` + +It is however clear that most chunks are not necessary and can be discarded from +the task graph. In Dask, this optimization (``cull``) is done by default before starting +a computation. Note that still the full 256x256 chunk is loaded and then sliced. +This makes sense, as the data is probably also stored and compressed in chunks. + +# dask-geomodeling + +Dask-geomodeling abstracts data layers ("blocks") that are to be subsetted later. For instance: + +``` +from dask_geomodeling.raster import RasterFileSource, Add + +source = RasterFileSource('/path/to/geotiff') +source_plus_two = Add(source, 2) # or: source + 2 +``` + +Each ``Block`` exposes the ``get_compute_graph()`` method which takes a "request" that +contains the subsetting arguments (bbox, width, height) and returns a dask graph. + +``` +request = { + "mode": "vals", + "bbox": (138000, 480000, 139000, 481000), + "projection": "epsg:28992", + "width": 256, + "height": 256 +} +graph, name = source_plus_two.get_compute_graph(**request) +``` + +The ``get_compute_graph()`` recusively calls through the stack of blocks. In this example, +the method on ``Add``, will call the method on ``RasterFileSource``. The end +result is equivalent to the graph of a dask collection, but without the built-in chunking: + +``` +# name: +'add_' + +# graph: +{ + 'raster_file_source_': (, "/path/to/geotiff", { + "mode": "vals", + "bbox": (138000, 480000, 139000, 481000), + "projection": "epsg:28992", + "width": 256, + "height": 256 + }), + 'add_': (, "raster_file_source_", 2), +} +``` + +# Partitioning + +Partitioning is a first-class property of a dask collection. A dask collection is in fact a +collection of partitions (a.k.a. chunks). For dask-geomodeling however, partitioning is done +by adding a separate `RasterTiler` block to the expression. This block splits up the request in +its `get_compute_graph()` method. + +``` +tiled = RasterTiler(source_plus_two, 500) # 500x500 m tiles +graph, name = tiled.get_compute_graph(**request) +# graph now contains 4 raster_file_source, 4 add, and 1 tiler functions +``` + +This difference stems from the difference in applications. The main application of `dask-geomodeling` is +inside a WMS server, which uses very small (256 x 256 pixels) tiles on often datasets with dimensions +of `1e6` pixels. Using such chunk sizes with `dask`, you would end up with `1e7` partitions. +Holding dictionaries of that size in memory (and optimizing them!) will certainly +deteriorate performance. + + +### Comparison + +| | dask | dask-geomodeling | +|--------------------|------------------------------|-----------------------------------------| +| dataset interface | collection | Block | +| graph | ``Array().__dask_graph__()`` | ``Block().get_compute_graph(**request)``| +| graph construction | when building collection | when calling get_compute_graph() | +| data partitioning | at datasource | at the last block (``RasterTiler``) | +| slicing | cull the complete graph | slice before building the graph | +| resampling | explicit operation | done automatically | +| reprojecting | explicit operation | done automatically | + +The main feature of dask-geomodeling is that it can do a "pushdown" of the subsetting request +without materializing the task graph. + + +## Dask - layers and HighLevelGraphs + +In the recent years, dask abstracted the concept of ``__dask_graph__()`` and +added a ``__dask_layers__()`` to the collections. The ``__dask_layers__`` only +contains one entry per operation ("layer") and ``__dask_graph__()`` only lazily +evaluates to a dictionary (with all chunks in it). This is allows efficient +high-level optimizations without dealing with the potentially large Dask graph. + +However, only some collections actually have high level graphs. Many operations will +cause highlevel graphs to materialize itself, thereby losing the performance benefits. +Also there seems to be concensus at dask that the highlevelgraph implementation is +too complex and needs a better approach. + +Pushing slices through these layers would solve the above issue. This "slice pushdown" +has been receiving a lot of attention in Dask lately: + +- [SQL predicate pushdown in dask.DataFrame, 2017](https://github.com/dask/dask/issues/1957) +- [Plan for high level graph optimizations, 2019](https://github.com/dask/dask/issues/5644) +- [Discussion: making slicing scale better](https://github.com/dask/dask/issues/5918) +- [Pushdown of slicing operator on arrays?, 2020](https://github.com/dask/dask/issues/6288) +- [Array slicing HighLevelGraph layer](https://github.com/dask/dask/pull/7655) +- [High Level Expressions, 2021](https://github.com/dask/dask/issues/7933) +- [https://github.com/dask/dask/issues/9159, Jun 2022](https://github.com/dask/dask/issues/9159) + +It appears to be complex to solve in general. + +# Steps to take to bring dask-geomodeling closer to dask/dask-geopandas + +## Expression system + +The mai +## Summary and example + +First we construct a dask collection with only 1 chunk: + +``` +arr = dask_geomodeling.GeoArray.from_tiff("/path/to/geotiff", with_chunks=False) +arr_plus_two = arr + 2 +``` + +The resulting collection must be JSON (de)serializable: + +``` +arr_plus_two = from_json(arr_plus_two.to_json()) +``` + +A geometric slice can be applied as follows: + +``` +arr_plus_two_sliced = arr_plus_two.subset(bbox, width, height, crs) +``` + +The `subset`, or some other Rewrite Rule, must rewrite the complete expression into a +chunked representation, without generatic too much chunks. + +## Subclassing dask.Array + +We need a CRS and GeoTransform-aware version of dask.Array. You can subclass `da.Array` and use +`subok=True` with operations. Or maybe xarray already provides this option? + +## Adopting dask-geopandas + +Dask-geopandas seems to provide a drop-in replacement for the few geometry functions we have. The +current field_operations are just present in dask.DataFrame. + +## (de)serialization of dask collections + +This is something to be looked into. Collections should be serialized at the expression level +(so `dask.Add(some_layer, 1)` and not some task graph). + +## subset / Rewrite Rule + +We could try to make some Rewrite Rule ([docs](https://docs.dask.org/en/latest/optimize.html#rewrite-rules)) +that pushes a subset through the graph until it reaches the data source and then +change the data source so that it only reads the relevant pixels. It can either +operate on the layers on or the dask graph itself. + +This requires registering this "push-through" logic per supported function. This +looks worse than it is: most functions are elementwise so a slice can just be +swapped with it. For example: + +``` +b = (a + 1)[:10] +``` + +Is equivalent to: + +``` +b = a[:10] + 1 +``` + +The per-operation logic is actually quite similar to the ``get_compute_graph`` method currently +implemented.