Source code for gridkit.bounded_grid

import abc
import functools
import operator
import warnings
from multiprocessing.sharedctypes import Value
from typing import Literal, Tuple

import numpy
import scipy
from pyproj import Transformer

import gridkit
from gridkit.base_grid import BaseGrid
from gridkit.errors import AlignmentError, IntersectionError
from gridkit.index import GridIndex, validate_index


class _BoundedGridMeta(type):
    """metaclass of the Raster class"""

    def __new__(cls, name, bases, namespace):
        # operators with a nan-base
        for op, as_idx in (
            (operator.mul, False),
            (operator.truediv, False),
            (operator.floordiv, False),
            (operator.pow, False),
            (operator.mod, False),
            (operator.ge, True),
            (operator.le, True),
            (operator.gt, True),
            (operator.lt, True),
        ):
            opname = "__{}__".format(op.__name__)
            opname_reversed = "__r{}__".format(op.__name__)
            normal_op, reverse_op = cls._gen_operator(
                op, base_value=numpy.nan, as_idx=as_idx
            )
            namespace[opname] = normal_op
            namespace[opname_reversed] = reverse_op

        # treat equals and not-equals as special cases to acommodate NaNs
        op = functools.partial(numpy.isclose, equal_nan=True)
        normal_op, reverse_op = cls._gen_operator(op, base_value=numpy.nan, as_idx=True)
        namespace["__eq__"] = normal_op
        namespace["__req__"] = reverse_op

        op = lambda left, right: ~numpy.isclose(left, right, equal_nan=True)
        normal_op, reverse_op = cls._gen_operator(op, base_value=numpy.nan, as_idx=True)
        namespace["__ne__"] = normal_op
        namespace["__rne__"] = reverse_op

        # operators with a zero-base
        for op, name in (
            (operator.add, "add"),
            (operator.sub, "sub"),
        ):
            opname = "__{}__".format(name)
            opname_reversed = "__r{}__".format(name)
            normal_op, reverse_op = cls._gen_operator(op, base_value=0)
            namespace[opname] = normal_op
            namespace[opname_reversed] = reverse_op

        # reduction operators
        for op, name in (
            (numpy.ma.sum, "sum"),
            (numpy.ma.mean, "mean"),
            (numpy.ma.max, "max"),
            (numpy.ma.min, "min"),
            (numpy.ma.median, "median"),
            (numpy.ma.std, "std"),
        ):
            namespace[name] = cls._gen_reduce_operator(op)

        # reduction operators (arg)
        for op, name in (
            (numpy.ma.argmax, "argmax"),
            (numpy.ma.argmin, "argmin"),
        ):
            namespace[name] = cls._gen_reduce_operator(op, as_idx=True)
        return super().__new__(cls, name, bases, namespace)

    @staticmethod
    def _gen_operator(op, base_value=numpy.nan, as_idx=False):
        """Generate operators

        Parameters
        ----------
        op: any callable operator form either `numpy` or `operator`
            A function from the `operator` package. Can also be a numpy or cusotm uperator.

        Returns
        -------
        :tuple: (`leif.core.raster._RasterMeta.<locals>.normal_op`, `leif.core.raster._RasterMeta.<locals>.reverse_op`)
            A function that takes in a left and right object to apply the supplied operation (`op`) to.\

        Raises
        ------
        NotImplementedError
            For grids of different sizes
        """

        def _grid_op(left, right, op, base_value):
            if not isinstance(right, BoundedGrid):
                data = op(left._data, right)
                if left.nodata_value is not None:
                    mask = left._data == left.nodata_value
                    data[mask] = left.nodata_value
                return left.__class__(data, bounds=left.bounds, crs=left.crs)

            if not left.intersects(right):
                raise AlignmentError(
                    "Operation not allowed on grids that do not overlap."
                )  # TODO rethink errors. Do we want an out of bounds error?

            aligned, reason = left.is_aligned_with(right)
            if not aligned:
                raise AlignmentError(f"Grids are not aligned. {reason}")

            # determine the dtype and nodata_value for the new grid, based on numpy casting rules
            def dominant_dtype_and_nodata_value(*grids):
                dtypes = [grid._data.dtype for grid in grids]
                dtype = numpy.result_type(*dtypes)

                nodata_values = [grid.nodata_value for grid in grids]
                if all([val == nodata_values[0] for val in nodata_values]):
                    nodata_value = nodata_values[0]
                elif all([val is not None for val in nodata_values]):
                    nodata_value = nodata_values[0]
                    warnings.warn(
                        f"Two grids have different `nodata_value`s: {nodata_values}. Using {nodata_value} for the resulting grid."
                    )
                elif all([val is None for val in nodata_values]):
                    nodata_value = None
                    warnings.warn(
                        f"No `nodata_value` was set on any of the grids. Any potential nodata gaps are filled with `{dtype.type(0)}`. Set a nodata_value by assigning a value to grid.nodata_value."
                    )
                else:  # one but not all grids have a nodata_value set
                    for val in nodata_values:
                        if val is not None:
                            nodata_value = val
                            break
                    else:
                        raise ValueError(
                            "Oops, something unexpected went wrong when determining the new nodata_value. To resolve this you could try setting the `nodata_value` attribute of your grids."
                        )
                return dtype, nodata_value

            dtype, nodata_value = dominant_dtype_and_nodata_value(left, right)

            # create combined grid, spanning both inputs
            combined_bounds = left.combined_bounds(right)
            _, shape = left.cells_in_bounds(
                combined_bounds, return_cell_count=True
            )  # TODO: split ids and shape outputs into different methods
            combined_data = numpy.full(
                shape,
                fill_value=dtype.type(0) if nodata_value is None else nodata_value,
                dtype=dtype,
            )

            combined_grid = left.update(
                combined_data,
                bounds=combined_bounds,
                nodata_value=nodata_value,
            )

            shared_bounds = left.shared_bounds(right)
            left_shared = left.crop(shared_bounds)
            right_shared = right.crop(shared_bounds)

            left_data = numpy.ma.masked_array(
                left_shared._data,
                left_shared.data == left_shared.nodata_value,
                dtype=dtype,
            )
            right_data = numpy.ma.masked_array(
                right_shared._data,
                right_shared.data == right_shared.nodata_value,
                dtype=dtype,
            )

            result = op(left_data, right_data)
            # assign data of `left` to combined_grid
            left_data = left._data.astype(dtype)
            if left.nodata_value is not None:
                left_data[left._data == left.nodata_value] = nodata_value
            combined_grid.assign(
                left_data, bounds=left.bounds, in_place=True, assign_nodata=False
            )

            # assign data of `right` to combined_grid
            right_data = right._data.astype(dtype)
            if right.nodata_value is not None:
                right_data[right._data == right.nodata_value] = nodata_value
            combined_grid.assign(
                right_data, bounds=right.bounds, in_place=True, assign_nodata=False
            )

            # overwrite shared area in combined_grid with the combined results
            count = gridkit.count([left, right])
            shared_mask = count == 2
            shared_mask_np = combined_grid.grid_id_to_numpy_id(shared_mask)
            result = op(left.value(shared_mask), right.value(shared_mask))
            combined_grid = combined_grid.astype(
                numpy.result_type(combined_grid._data.dtype, result.dtype)
            )  # when dividing the dtype changes
            combined_grid._data[shared_mask_np] = (
                result  # TODO: find more elegant way of updating data with grid ids as mask
            )

            return combined_grid

        def normal_op(left, right):
            if not isinstance(right, BoundedGrid):
                data = op(left._data, right)
                if left.nodata_value is not None:
                    nodata_np_id = numpy.where(left._data == left.nodata_value)
                    data[nodata_np_id] = left.nodata_value
                grid = left.update(data)
            else:
                grid = _grid_op(left, right, op, base_value=base_value)
            if not as_idx:
                return grid
            ids = left._mask_to_index(grid._data)
            return GridIndex(ids)

        def reverse_op(left, right):
            if not isinstance(right, BoundedGrid):
                data = op(right, left._data)
                if left.nodata_value is not None:
                    nodata_np_id = numpy.where(left._data == left.nodata_value)
                    data[nodata_np_id] = left.nodata_value
                grid = left.update(data)
            else:
                grid = _grid_op(left, right, op, base_value=base_value)
            return (
                grid._mask_to_index(grid._data) if as_idx else grid
            )  # TODO: left._mask_to_index(data) works if as_idx is true

        return normal_op, reverse_op

    @staticmethod
    def _gen_reduce_operator(op, as_idx=False):
        def internal(self, *args, **kwargs):
            data = self._data
            if not self.nodata_value is None:
                data = numpy.ma.masked_array(
                    data, numpy.isclose(data, self.nodata_value, equal_nan=True)
                )
            result = op(data, *args, **kwargs)

            if not as_idx:
                return result

            # since `as_idx`=True, assume result is the id corresponding to the raveled array
            # TODO: put lines below in function self.numpy_id_to_grid_id or similar. Think of raveled vs xy input
            np_id_x = int(result % self.width)
            np_id_y = int(numpy.floor(result / self.width))
            left_top = self.corners[0]
            left_top_id = self.cell_at_point(left_top + [self.dx / 2, -self.dy / 2])
            index = left_top_id + [np_id_x, -np_id_y]
            return index

        return internal


class _AbstractBoundedGridMeta(abc.ABCMeta, _BoundedGridMeta):
    """Class that enables usage of the :class:`~gridkit.bounded_grid._BoundedGridMeta` metaclass despite using ABCMeta as metaclass for the parent class."""

    pass


[docs] class BoundedGrid(metaclass=_AbstractBoundedGridMeta): def __init__( self, data: numpy.ndarray, *args, bounds: tuple, nodata_value=None, prevent_copy: bool = False, **kwargs, ) -> None: if "rotation" in kwargs: raise NotImplementedError("'rotation' is not supported for Bounded grids") self._data = data if prevent_copy else data.copy() self._bounds = bounds self.nodata_value = nodata_value super(BoundedGrid, self).__init__(*args, **kwargs) @property def data(self): return self._data @data.setter def data(self, data): new_data = numpy.array(data) if new_data.dtype.name == "object": raise TypeError( f"Data cannot be interpreted as a numpy.ndarray, got {type(data)}" ) if new_data.shape != self.data.shape: raise ValueError( f"Cannot set data that is different in size. Expected a shape of {self.data.shape}, got {new_data.shape}." ) self._data = data def __array__(self, dtype=None): if dtype: return self.data.astype(dtype) return self.data @property def dtype(self): return self._data.dtype
[docs] def astype(self, dtype): return self.update(new_data=self._data.astype(dtype))
[docs] def update(self, new_data, bounds=None, crs=None, nodata_value=None): # TODO figure out how to update dx, dy, origin if not bounds: bounds = self.bounds if not crs: crs = self.crs if not nodata_value: nodata_value = self.nodata_value return self.__class__( new_data, bounds=bounds, crs=crs, nodata_value=nodata_value )
[docs] def copy(self): return self.update(self.data)
@property def bounds(self) -> tuple: """Raster Bounds Returns ------- :class:`tuple` The bounds of the data in (left, bottom, right, top) or equivalently (min-x, min-y, max-x, max-y) """ return self._bounds @property def mpl_extent(self) -> tuple: """Raster Bounds Returns ------- :class:`tuple` The extent of the data as defined expected by matplotlib in (left, right, bottom, top) or equivalently (min-x, max-x, min-y, max-y) """ b = self._bounds return (b[0], b[2], b[1], b[3]) @property def corners(self): b = self.bounds return numpy.array( [ [b[0], b[3]], # left-top [b[2], b[3]], # right-top [b[2], b[1]], # right-bottom [b[0], b[1]], # left-bottom ] ) @property def width(self): """Raster width Returns ------- :class:`int` The number of grid cells in x-direction """ return self._data.shape[-1] @property def height(self): """Raster height Returns ------- :class:`int` The number of grid cells in y-direction """ return self._data.shape[-2] @property def cellsize(self): """Get the gridsize in (dx, dy)""" return (self.dx, self.dy) @property def nr_cells(self): """Number of cells Returns ------- :class:`int` The total number of cells in the grid """ return self.height * self.width
[docs] def intersects(self, other): other_bounds = ( other.bounds if isinstance(other, BaseGrid) else other ) # Allow for both grid objects and bounds return not ( self.bounds[0] >= other_bounds[2] or self.bounds[2] <= other_bounds[0] or self.bounds[1] >= other_bounds[3] or self.bounds[3] <= other_bounds[1] )
def _mask_to_index(self, mask): if not self._data.shape == mask.shape: raise ValueError( f"Mask shape {mask.shape} does not match data shape {self._data.shape}" ) ids = self.cells_in_bounds(self.bounds) return GridIndex(ids[mask])
[docs] def shared_bounds(self, other): other_bounds = other.bounds if isinstance(other, BaseGrid) else other if not self.intersects(other): raise IntersectionError( f"Grid with bounds {self.bounds} does not intersect with grid with bounds {other_bounds}." ) return ( max(self.bounds[0], other_bounds[0]), max(self.bounds[1], other_bounds[1]), min(self.bounds[2], other_bounds[2]), min(self.bounds[3], other_bounds[3]), )
[docs] def combined_bounds(self, other): other_bounds = other.bounds if isinstance(other, BaseGrid) else other return ( min(self.bounds[0], other_bounds[0]), min(self.bounds[1], other_bounds[1]), max(self.bounds[2], other_bounds[2]), max(self.bounds[3], other_bounds[3]), )
[docs] @abc.abstractmethod def crop(self, new_bounds, bounds_crs=None): """Cut out a slice of data contained within the supplied bounds. Parameters ---------- new_bounds: `Tuple(minx, miny, maxx, maxy)` The bounds defining the area to crop, in (minx, miny, maxx, maxy). bounds_crs: `pyproj.CRS` (optional) The bounds defining the extent of the cropped data. The value can be anything accepted by `pyproj.CRS.from_user_input()`. Returns ------- :class: `BoundedGrid` A BoundedGrid containing the data included in the cropped area contained within the bounds. """ pass
[docs] @abc.abstractmethod def intersecting_cells(self, other): pass
[docs] @abc.abstractmethod def numpy_id_to_grid_id(self, index): pass
[docs] @abc.abstractmethod def grid_id_to_numpy_id(self, index): pass
@property def indices(self): """Return the indices within the bounds of the data""" return self.cells_in_bounds(self.bounds)
[docs] def assign( self, data, *, anchor=None, bounds=None, in_place=True, assign_nodata=True ): if not any([anchor, bounds]): raise ValueError( "Please supply either an 'anchor' or 'bounds' keyword to position the data in the grid." ) new_data = self.data if in_place else self.data.copy() if bounds: slice_y, slice_x = self._data_slice_from_bounds(bounds) crop = new_data[slice_y, slice_x] if not assign_nodata: mask = data != self.nodata_value crop[mask] = data[mask] else: crop[:] = data elif anchor: # a corner or center raise NotImplementedError() else: raise ValueError("Please supply one of: {anchor, bounds}") return self.update(new_data)
[docs] @validate_index def value(self, index=None, oob_value=None): """Return the value at the given cell index. Parameters ---------- index: :class:`.GridIndex` (optional) The index of the cell(s) of which to return the value. If not supplied, all values will be returned in a flattened array. oob_value: :class:`float` (optional) The value assigned to values that are 'out of bounds'. I.e. the value assigned to ``index`` entries not covered by the data. Default: numpy.nan Returns ------- :class:`numpy.ndarray` The values at the supplied `index` locations """ if index is None: index = self.indices # Convert grid-ids into numpy-ids np_id = numpy.stack(self.grid_id_to_numpy_id(index.ravel())[::-1]) if np_id.ndim == 1: np_id = np_id[:, numpy.newaxis] # Identify any id outside the bounds oob_mask = numpy.where(np_id[0] >= self._data.shape[1]) oob_mask += numpy.where(np_id[0] < 0) oob_mask += numpy.where(np_id[1] >= self._data.shape[0]) oob_mask += numpy.where(np_id[1] < 0) oob_mask = numpy.hstack(oob_mask) if numpy.any( oob_mask ): # make sure we know what nodata value to set if ids are out of bounds if oob_value is None and self.nodata_value is None: raise ValueError( "Some indices do not have data. Please remove these ids, set a 'nodata_value' or supply an 'oob_value'." ) oob_value = oob_value if oob_value else self.nodata_value else: oob_value = ( oob_value if oob_value else 0 ) # the oob_value does not matter if no ids are out of bounds # Return array's `dtype` needs to be float instead of integer if an id falls outside of bounds # For NaNs don't make sense as integer if ( numpy.any(oob_mask) and not numpy.isfinite(oob_value) and not numpy.issubdtype(self._data.dtype, numpy.floating) ): print( f"Warning: dtype `{self._data.dtype}` might not support an `oob_value` of `{oob_value}`." ) values = numpy.full(np_id.shape[1], oob_value, dtype=self._data.dtype) sample_mask = numpy.ones_like(values, dtype="bool") sample_mask[oob_mask] = False np_id = np_id[:, sample_mask] values[sample_mask] = self._data[np_id[1], np_id[0]] return values.reshape(index.shape)
[docs] def nodata(self): if self.nodata_value is None: return None return self == self.nodata_value
[docs] def percentile(self, value): return numpy.percentile(self, value)
def _data_slice_from_bounds(self, bounds): if not self.are_bounds_aligned(bounds): raise ValueError( f"Cannot create slice from unaligned bounds {tuple(bounds)}" ) difference_left = round(abs((self.bounds[0] - bounds[0]) / self.dx)) difference_right = round(abs((self.bounds[2] - bounds[2]) / self.dx)) slice_x = slice( difference_left, self.width - difference_right, # add one for upper bound of slice is exclusive ) difference_bottom = round(abs((self.bounds[1] - bounds[1]) / self.dy)) difference_top = round(abs((self.bounds[3] - bounds[3]) / self.dy)) slice_y = slice( difference_top, self.height - difference_bottom, # add one for upper bound of slice is exclusive ) return slice_y, slice_x
[docs] def interp_nodata(self, method="linear", in_place=False): """Interpolate the cells containing nodata, if they are inside the convex hull of cells that do contain data. Parameters ---------- method: :class:`str` The interpolation method to be used. Options are ("nearest", "linear", "cubic"). Default: "linear". in_place: :class:`bool` Boolean flag determining whether to fill the nodata values in-place. In-memory values are modified if in_place is Ture. A copy of the grid is made where nodata values are filled if in_place is False. Returns ------- :class:`tuple` A copy of the grid with interpolated values. """ method_lut = dict( nearest=scipy.interpolate.NearestNDInterpolator, linear=functools.partial( scipy.interpolate.LinearNDInterpolator, fill_value=self.nodata_value ), cubic=scipy.interpolate.CloughTocher2DInterpolator, ) if self.nodata_value is None: raise ValueError( f"Cannot interpolate nodata values if attribute 'nodata_value' is not set." ) if method not in method_lut: raise ValueError( f"Method '{method}' is not supported. Supported methods: {method_lut.keys()}" ) if not in_place: self = self.copy() interp_func = method_lut[method] values = self.data.ravel() nodata_mask = values == self.nodata_value points = self.centroid().reshape(-1, 2) interpolator = interp_func( points[~nodata_mask], values[~nodata_mask], ) filled_values = interpolator(points[nodata_mask]) self.data.ravel()[nodata_mask] = filled_values return self
[docs] def interpolate( self, sample_points, method: Literal["nearest", "bilinear", "inverse_distance"] = "nearest", **interp_kwargs, ): """Interpolate the value at the location of ``sample_points``. Points that are outside of the bounds of the data are assigned `self.nodata_value`, or 'NaN' if no nodata value is set. Parameters ---------- sample_points: :class:`numpy.ndarray` The coordinates of the points at which to sample the data method: :class:`str`, `'nearest', 'bilinear'`, optional The interpolation method used to determine the value at the supplied `sample_points`. Supported methods: - "nearest", for nearest neigbour interpolation, effectively sampling the value of the data cell containing the point - "bilinear", linear interpolation using the four cells surrounding the point - "inverse_distance", weighted inverse distance using the 4,3,6 nearby cells surrounding the point for Rect, Hex and Rect grid respectively. Default: "nearest" **interp_kwargs: `dict` The keyword argument passed to the interpolation function corresponding to the specified `method` Returns ------- :class:`numpy.ndarray` The interpolated values at the supplied points See also -------- :py:meth:`.BoundedGrid.resample` :py:meth:`.BaseGrid.interp_from_points` """ if method == "nearest": new_ids = self.cell_at_point(sample_points) return self.value(new_ids) elif method == "bilinear" or method == "linear": return_shape = sample_points.shape[:-1] result = self._bilinear_interpolation(sample_points.reshape(-1, 2)) return result.reshape(return_shape) elif method == "inverse_distance": return self._inverse_distance_interpolation(sample_points, **interp_kwargs) raise ValueError(f"Resampling method '{method}' is not supported.")
def _inverse_distance_interpolation( self, sample_points, max_nr_nans=0, decay_constant=1 ): if not isinstance(sample_points, numpy.ndarray): sample_points = numpy.array(sample_points) original_shape = sample_points.shape if not original_shape[-1] == 2: raise ValueError( f"Expected the last axis of sample_points to have two elements (x,y). Got {original_shape[-1]} elements" ) sample_points = sample_points.reshape(-1, 2) nearby_cells = self.cells_near_point(sample_points) # FIXME: needs to be conform with tri-grid # nearby_cells = nearby_cells.index.swapaxes(0,1) # for hexagons and rectangles nearby_values = self.value(nearby_cells) nearby_centroids = self.centroid( nearby_cells ) # shape is (points[N], cells[6], xy[2]) # swap axes to match dimensions to sample points nearby_centroids = numpy.swapaxes( nearby_centroids, 0, 1 ) # shape is (cells[6], points[N], xy[2]) point_corner_vec = ( nearby_centroids - sample_points ) # shape is (cells[6], points[N], xy[2]) distances = numpy.linalg.norm( point_corner_vec, axis=-1 ) # compute distance over xy vector # TODO: allow for different weighting equations, sch as Shepard's interpolation with different power parameters # weights = ((2*self.r) / distances) weights = numpy.exp(-((distances / decay_constant) ** 2)) weights = weights / numpy.sum(weights, axis=0) # breakpoint() # swap axes back to match dimensions to nearby_values weights = numpy.swapaxes(weights, 0, 1) # shape is (points[N], cells[6]) # TODO: determine mask based on number of nans. # The edge might have 4 nans and 2 values where the point is outside of the bounds. # This might lead to edge effects. result = numpy.nansum(weights * nearby_values, axis=1) nr_nans = (~numpy.isfinite(nearby_values)).sum(axis=1) nan_mask = nr_nans > max_nr_nans result[nan_mask] = numpy.nan return result.reshape(original_shape[:-1])
[docs] def resample(self, alignment_grid, method="nearest", **interp_kwargs): """Resample the grid onto another grid. This will take the locations of the grid cells of the other grid (here called ``alignment_grid``) and determine the value on these location based on the values of the original grid (``self``). The steps are as follows: 1. Transform the bounds of the original data to the CRS of the alignment grid (if not already the same) No transformation is done if any of the grids has no CRS set. 2. Find the cells of the alignment grid within these transformed bounds 3. Find the cells of the original grid that are nearby each of the centroids of the cells found in 2. How many nearby cells are selected depends on the selected ``method`` 4. Interpolate the values using the supplied ``method`` at each of the centroids of the alignment grid cells selected in 2. 5. Create a new bounded grid using the attributes of the alignment grid Parameters ---------- alignment_grid: :class:`.BaseGrid` The grid with the desired attributes on which to resample. method: :class:`str`, `'nearest', 'bilinear', 'inverse_distance'`, optional The interpolation method used to determine the value at the supplied `sample_points`. Supported methods: - "nearest", for nearest neigbour interpolation, effectively sampling the value of the data cell containing the point - "bilinear", linear interpolation using the 4,3,6 nearby cells surrounding the point for Rect, Hex and Rect grid respectively. - "inverse_distance", weighted inverse distance using the 4,3,6 nearby cells surrounding the point for Rect, Hex and Rect grid respectively. Default: "nearest" **interp_kwargs: `dict` The keyword argument passed to the interpolation function corresponding to the specified `method` Returns ------- :class:`.BoundedGrid` The interpolated values at the supplied points See also -------- :py:meth:`.BoundedGrid.interpolate` :py:meth:`.BaseGrid.interp_from_points` """ if self.crs is None or alignment_grid.crs is None: warnings.warn( "`crs` not set for one or both grids. Assuming both grids have an identical CRS." ) different_crs = False else: different_crs = not self.crs.is_exact_same(alignment_grid.crs) # make sure the bounds align with the grid if different_crs: transformer = Transformer.from_crs( self.crs, alignment_grid.crs, always_xy=True ) bounds = transformer.transform_bounds(*self.bounds) else: bounds = self.bounds # Align using "contract" for we cannot sample outside of the original bounds new_bounds = alignment_grid.align_bounds(bounds, mode="contract") new_ids, shape = alignment_grid.cells_in_bounds( bounds=new_bounds, return_cell_count=True ) new_points = alignment_grid.centroid(new_ids) if different_crs: transformer = Transformer.from_crs( alignment_grid.crs, self.crs, always_xy=True ) original_shape = new_points.shape raveled_new_points = new_points.reshape(-1, 2) transformed_points = transformer.transform(*raveled_new_points.T) new_points = numpy.vstack(transformed_points).T.reshape(original_shape) value = self.interpolate(new_points, method=method, **interp_kwargs) # If value id 1D, turn into 2D # Take into account if the 1D line of cells runs in x or y direction if 1 in shape: empty_axis = 0 if shape[0] == 1 else 1 value = numpy.expand_dims(value, axis=empty_axis) nodata_value = self.nodata_value if self.nodata_value is not None else numpy.nan grid_kwargs = dict( data=value, bounds=new_bounds, crs=alignment_grid.crs, nodata_value=nodata_value, ) if hasattr(alignment_grid, "_shape"): grid_kwargs["shape"] = alignment_grid._shape new_grid = alignment_grid.bounded_cls(**grid_kwargs) return new_grid