Tiled aggregation from points with Dask

Group points into grid cells in a tiled manner, using Dask.

Introduction

In example aggregate.py an approach is demonstrated where points are grouped in cells and averaged per cell. This example does the same, but in a tiled manner. Dask is used for the groupby opertaion instead of Pandas. The main difference is that Dask divides the data into chunks that can be processed independently. This approach is beneficial if data does not comfortably fit into memory. A second reason to use this approach is the ability to process different chunks concurrently, which can provide a significant speedup.

Note

Even though parallel processing using Dask can speed up the script, for a small case like this the overhead is larger than the benefit obtained by parallelizing the operation.

For a description of the approach used, please refer to aggregate.py. This example mostly highlights the differences between the two approaches.

Note

Dask uses the terms ‘chunks’, ‘blocks’ and ‘partitions’. While there are some nuances, in this example these terms will be used interchangeably. I will try to stick with ‘chunks’ where I can since this is the general term, though you will find references to ‘map_blocks()’ for Arrays and ‘map_partitions()’ for DataFrames.

Generate input data

Let’s start by generating some points. The data will be a set of points scattered around a circle to create a dougnut-like shape. I will first generate the points and the turn them into a Dask Array using dask.dataframe.from_array. In a scenario where Dask is used because the data is large and memory is limited, you will likely obtain your dask array in a way that does not involve first loading the whole array into memory like I do here. I will print npartitions to show the number of chunks the array is divided in (spoiler, it will be 2000/500 = 4).

import dask
import dask.array
import dask.dataframe

from gridkit.doc_utils import generate_2d_scatter_doughnut, plot_polygons

points = generate_2d_scatter_doughnut(num_points=2000, radius=4)
df = dask.dataframe.from_array(points, columns=["pnt_x", "pnt_y"], chunksize=500)
print(df.npartitions)  # show the number of chunks
/opt/hostedtoolcache/Python/3.9.20/x64/lib/python3.9/site-packages/dask/dataframe/__init__.py:42: FutureWarning:
Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.

  warnings.warn(msg, FutureWarning)
4

Relate points to grid cells

Now we can create a grid and determine the cell associated to each point. No advanced knowledge of Dask is required to follow this example. It suffices to know that ‘map_blocks’ and ‘map_partitions’ will apply the supplied function to all chunks. GridIndex.index_1d() is used because it gives a single number that can be used as a DataFrame index during the groupby operation.

The column ‘nr_points’ will contain the nr of points per cell once ‘.count()’ is called on the groupy object

Note

The DataFrame only has a ‘pnt_x’ and a ‘pnt_y’ column. which is already the appropriate shape for grid.cell_at_point. However, p[["pnt_x", "pnt_y"]] is specified here to make clear that cell_at_point expects this pattern and any other columns in the DataFrame will need to be ignored for this operation.

from gridkit import GridIndex, HexGrid

grid = HexGrid(size=1, shape="flat")
df["cell_id"] = df.map_partitions(
    lambda p: grid.cell_at_point(p[["pnt_x", "pnt_y"]]).index_1d
)
df["nr_points"] = 1
print(df)
/home/runner/work/GridKit/GridKit/gridkit/hex_grid.py:100: UserWarning: A 'flat' ``shape`` will be deprecated in version v1.0.0. It is advised to use ``rotation=30`` instead.
  warnings.warn(
Dask DataFrame Structure:
                 pnt_x    pnt_y cell_id nr_points
npartitions=4
0              float64  float64   int64     int64
500                ...      ...     ...       ...
1000               ...      ...     ...       ...
1500               ...      ...     ...       ...
1999               ...      ...     ...       ...
Dask Name: assign, 6 graph layers

We can group by the ‘cell_id’. That means points with the same ‘cell_id’ will be combined. ‘count()’ then gets the number of points for every ‘cell_id’.

grouped = df.groupby("cell_id")
occurrences = grouped.count()

Now we have the number of points per cell. In order to get the Polygon of each corresponding cell, ‘occurrences.index’ can be used, but since it is a 1d index, it first needs to be converted to a normal index using GridIndex.from_index_1d()

Note

If you are wondering why we went through all that effort just to end up with a pandas DataFrame in the end, suffice it to say that the size of the data is significantly reduced after the groupby. Also, for the sake of the example I will plot the data with matplotlib.

polygons = occurrences.index.map_partitions(
    lambda id: grid.to_shapely(GridIndex.from_index_1d(id)), meta={"geoms": object}
)
geoms, points_per_cell = dask.compute(polygons, occurrences.nr_points)

Visualize the results

Note how the results are identical to the pandas version. The input data was the same so this should indeed be the case.

import matplotlib.pyplot as plt

plot_polygons(geoms, points_per_cell, "Blues")
plt.title("Number of points per cell")
plt.show()
Number of points per cell

Total running time of the script: (0 minutes 0.370 seconds)

Gallery generated by Sphinx-Gallery