tiler

Tiler teaser image

CI Code style: black Coverage status PyPI version

Github repository | Github issues | Documentation


⚠️ Please note: work in progress, things will change and/or break! ⚠️


This python package provides consistent and user-friendly functions for tiling/patching and subsequent merging of NumPy arrays.

Such tiling is often required for various heavy image-processing tasks such as semantic segmentation in deep learning, especially in domains where images do not fit into GPU memory (e.g., hyperspectral satellite images, whole slide images, videos, tomography data).

Please see Quick start section.
If you want to use tiler interactively, I highly recommend napari and napari-tiler plugin.

Features

  • N-dimensional
  • Optional in-place tiling
  • Optional channel dimension (dimension that is not tiled)
  • Optional tile batching
  • Tile overlapping
  • Access individual tiles with an iterator or a getter
  • Tile merging, with optional window functions/tapering

Quick start

You can find more examples in examples.
For more Tiler and Merger functionality, please check documentation.

import numpy as np
from tiler import Tiler, Merger

image = np.random.random((3, 1920, 1080))

# Setup tiling parameters
tiler = Tiler(data_shape=image.shape,
              tile_shape=(3, 250, 250),
              channel_dimension=0)

## Access tiles:
# 1. with an iterator
for tile_id, tile in tiler.iterate(image):
   print(f'Tile {tile_id} out of {len(tiler)} tiles.')
# 1b. the iterator can also be accessed through __call__
for tile_id, tile in tiler(image):
   print(f'Tile {tile_id} out of {len(tiler)} tiles.')
# 2. individually
tile_3 = tiler.get_tile(image, 3)
# 3. in batches
tiles_in_batches = [batch for _, batch in tiler(image, batch_size=10)]

# Setup merging parameters
merger = Merger(tiler)

## Merge tiles:
# 1. one by one
for tile_id, tile in tiler(image):
   merger.add(tile_id, some_processing_fn(tile))
# 2. in batches
merger.reset()
for batch_id, batch in tiler(image, batch_size=10):
   merger.add_batch(batch_id, 10, batch)

# Final merging: applies tapering and optional unpadding
final_image = merger.merge(unpad=True)  # (3, 1920, 1080)

Installation

The latest release is available through pip:

pip install tiler

Alternatively, you can clone the repository and install it manually:

git clone git@github.com:the-lay/tiler.git
cd tiler
pip install

If you are planning to contribute, please take a look at the contribution instructions.

Motivation & other packages

I work on semantic segmentation of patched 3D data and I often found myself reusing tiling functions that I wrote for the previous projects. No existing libraries listed below fit my use case, so that's why I wrote this library.

However, other libraries/examples might fit you better:

Moreover, some related approaches have been described in the literature:

Frequently asked questions

This section is a work in progress.

How do I create tiles with less dimensions than the data array?

Tiler expects tile_shape to have less than or the same number of elements as data_shape. If tile_shape has less elements than data_shape, tile_shape will be prepended with ones to match the size of data_shape.
For example, if you want to get 2d tiles out from 3d array you can initialize Tiler like this: Tiler(data_shape=(128,128,128), tile_shape=(128, 128)) and it will be equivalent to Tiler(data_shape=(128,128,128), tile_shape=(1, 128, 128)).

View Source
import sys
from tiler.tiler import Tiler
from tiler.merger import Merger


from pkg_resources import get_distribution, DistributionNotFound

try:
    VERSION = get_distribution(__name__).version
except DistributionNotFound:  # pragma: no cover
    try:
        from .version import version as VERSION  # noqa
    except ImportError:  # pragma: no cover
        raise ImportError(
            "Failed to find (autogenerated) version.py. "
            "This might be because you are installing from GitHub's tarballs, "
            "use the PyPI ones."
        )
__version__ = VERSION

__all__ = ["Tiler", "Merger"]

# Import README file as a module general docstring, only when generating documentation
# We also modify it to make it prettier
if "pdoc" in sys.modules:  # pragma: no cover
    with open("README.md", "r") as f:
        _readme = f.read()

        # remove baby logo and header
        _readme = _readme.split("\n", 2)[2]

        # replace teaser image path
        _readme = _readme.replace("misc/teaser/tiler_teaser.png", "tiler_teaser.png")
        _readme = _readme.replace("misc/baby_logo.png", "baby_logo.png")
        __doc__ = _readme
#   class Tiler:
View Source
class Tiler:
    TILING_MODES = ["constant", "drop", "irregular", "reflect", "edge", "wrap"]
    r"""
    Supported tiling modes:
    - `constant` (default)  
        If a tile is smaller than `tile_shape`, pad it with the constant value along each axis to match `tile_shape`.
        Set the value with the keyword `constant_value`.  
    - `drop`  
        If a tile is smaller than `tile_shape` in any of the dimensions, ignore it. Can result in zero tiles.
    - `irregular`  
        Allow tiles to be smaller than `tile_shape`.
    - `reflect`  
        If a tile is smaller than `tile_shape`,
        pad it with the reflection of values along each axis to match `tile_shape`.
    - `edge`  
        If a tile is smaller than `tile_shape`,
        pad it with the edge values of data along each axis to match `tile_shape`.
    - `wrap`  
        If a tile is smaller than `tile_shape`,
        pad it with the wrap of the vector along each axis to match `tile_shape`.
        The first values are used to pad the end and the end values are used to pad the beginning.
    """

    def __init__(
        self,
        data_shape: Union[Tuple, List, np.ndarray],
        tile_shape: Union[Tuple, List, np.ndarray],
        overlap: Union[int, float, Tuple, List, np.ndarray] = 0,
        channel_dimension: Optional[int] = None,
        mode: str = "constant",
        constant_value: float = 0.0,
    ):
        """Tiler class precomputes everything for tiling with specified parameters, without actually slicing data.
        You can access tiles individually with `Tiler.get_tile()` or with an iterator, both individually and in batches,
        with `Tiler.iterate()` (or the alias `Tiler.__call__()`).

        Args:
            data_shape (tuple, list or np.ndarray): Input data shape, e.g. (1920, 1080, 3), [512, 512, 512] or np.ndarray([3, 1024, 768]).
                If there is a channel dimension, it should be included in the shape.

            tile_shape (tuple, list or np.ndarray): Shape of a tile, e.g. (256, 256, 3), [64, 64, 64] or np.ndarray([3, 128, 128]).
                Tile must have the same number of dimensions as data or less.
                If less, the shape will be automatically prepended with ones to match data_shape size.

            overlap (int, float, tuple, list or np.ndarray): Specifies overlap between tiles.
                If integer, the same overlap of overlap pixels applied in each dimension, except channel_dimension.
                If float, percentage of a tile_shape to overlap (from 0.0 to 1.0), except channel_dimension.
                If tuple, list or np.ndarray, explicit size of the overlap (must be smaller than tile_shape in each dimension).
                Default is `0`.

            channel_dimension (int, optional): Specifies which axis is the channel dimension that will not be tiled.
                Usually it is the last or the first dimension of the array.
                Negative indexing (`-len(data_shape)` to `-1` inclusive) is allowed.
                Default is `None`, no channel dimension in the data.

            mode (str): Defines how the data will be tiled.
                Must be one of the supported `Tiler.TILING_MODES`. Defaults to `"constant"`.

            constant_value (float): Specifies the value of padding when `mode='constant'`.
                Default is `0.0`.
        """

        self.recalculate(
            data_shape=data_shape,
            tile_shape=tile_shape,
            overlap=overlap,
            channel_dimension=channel_dimension,
            mode=mode,
            constant_value=constant_value,
        )

    def recalculate(
        self,
        data_shape: Optional[Union[Tuple, List, np.ndarray]] = None,
        tile_shape: Optional[Union[Tuple, List, np.ndarray]] = None,
        overlap: Optional[Union[int, float, Tuple, List, np.ndarray]] = None,
        channel_dimension: Optional[int] = None,
        mode: Optional[str] = None,
        constant_value: Optional[float] = None,
    ) -> None:
        """Recalculates tiling for new given settings.
        If a passed value is None, use previously given value.

        For more information about each argument see `Tiler.__init__()` documentation.
        """

        # Data and tile shapes
        if data_shape is not None:
            self.data_shape = np.asarray(data_shape, dtype=np.int64)
        if tile_shape is not None:
            self.tile_shape = np.atleast_1d(np.asarray(tile_shape, dtype=np.int64))

            # Append ones to match data_shape size
            if self.tile_shape.size <= self.data_shape.size:
                size_difference = self.data_shape.size - self.tile_shape.size
                self.tile_shape = np.insert(
                    arr=self.tile_shape, obj=0, values=np.ones(size_difference), axis=0
                )
                warnings.warn(
                    f"Tiler automatically adjusted tile_shape from {tuple(tile_shape)} to {tuple(self.tile_shape)}."
                )
        self._n_dim: int = len(self.data_shape)
        if (self.tile_shape <= 0).any() or (self.data_shape <= 0).any():
            raise ValueError(
                "Tile and data shapes must be tuple, list or ndarray of positive integers."
            )
        if self.tile_shape.size != self.data_shape.size:
            raise ValueError(
                "Tile shape must have less or equal number of elements compared to the data shape. "
                "If less, your tile shape will be prepended with ones to match the data shape, "
                "e.g. data_shape=(28, 28), tile_shape=(28) -> tile_shape=(1, 28)."
            )

        # Tiling mode
        if mode is not None:
            self.mode = mode
        if self.mode not in self.TILING_MODES:
            raise ValueError(
                f"{self.mode} is an unsupported tiling mode, please check the documentation."
            )

        # Constant value used for constant tiling mode
        if constant_value is not None:
            self.constant_value = constant_value

        # Channel dimension
        # Channel dimension can be None which means we need to check for init too
        if not hasattr(self, "channel_dimension") or channel_dimension is not None:
            self.channel_dimension = channel_dimension
        if self.channel_dimension:
            if (self.channel_dimension >= self._n_dim) or (
                self.channel_dimension < -self._n_dim
            ):
                raise ValueError(
                    f"Specified channel dimension is out of bounds "
                    f"(should be None or an integer from {-self._n_dim} to {self._n_dim - 1})."
                )
            if self.channel_dimension < 0:
                # negative indexing
                self.channel_dimension = self._n_dim + self.channel_dimension

        # Overlap and step
        if overlap is not None:
            self.overlap = overlap
        if isinstance(self.overlap, float):
            if self.overlap < 0 or self.overlap > 1.0:
                raise ValueError(
                    "Float overlap must be in range of 0.0 (0%) to 1.0 (100%)."
                )

            self._tile_overlap: np.ndarray = np.ceil(
                self.overlap * self.tile_shape
            ).astype(int)
            if self.channel_dimension is not None:
                self._tile_overlap[self.channel_dimension] = 0

        elif isinstance(self.overlap, int):
            tile_shape_without_channel = self.tile_shape[
                np.arange(self._n_dim) != self.channel_dimension
            ]
            if self.overlap < 0 or np.any(self.overlap >= tile_shape_without_channel):
                raise ValueError(
                    f"Integer overlap must be in range of 0 to {np.max(tile_shape_without_channel)}"
                )

            self._tile_overlap: np.ndarray = np.array(
                [self.overlap for _ in self.tile_shape]
            )
            if self.channel_dimension is not None:
                self._tile_overlap[self.channel_dimension] = 0

        elif (
            isinstance(self.overlap, list)
            or isinstance(self.overlap, tuple)
            or isinstance(self.overlap, np.ndarray)
        ):
            if np.any(np.array(self.overlap) < 0) or np.any(
                self.overlap >= self.tile_shape
            ):
                raise ValueError("Overlap size must be smaller than tile_shape.")

            self._tile_overlap: np.ndarray = np.array(self.overlap).astype(int)

        else:
            raise ValueError(
                "Unsupported overlap mode (not float, int, list, tuple or np.ndarray)."
            )

        self._tile_step: np.ndarray = (self.tile_shape - self._tile_overlap).astype(
            int
        )  # tile step

        # Calculate mosaic (collection of tiles) shape
        div, mod = np.divmod(
            [self.data_shape[d] - self._tile_overlap[d] for d in range(self._n_dim)],
            self._tile_step,
        )
        if self.mode == "drop":
            self._indexing_shape = div
        else:
            self._indexing_shape = div + (mod != 0)
        if self.channel_dimension is not None:
            self._indexing_shape[self.channel_dimension] = 1

        # Calculate new shape assuming tiles are padded
        if self.mode == "irregular":
            self._new_shape = self.data_shape
        else:
            self._new_shape = (
                self._indexing_shape * self._tile_step
            ) + self._tile_overlap
        self._shape_diff = self._new_shape - self.data_shape
        if self.channel_dimension is not None:
            self._shape_diff[self.channel_dimension] = 0

        # If channel dimension is given, set tile_step of that dimension to 0
        if self.channel_dimension is not None:
            self._tile_step[self.channel_dimension] = 0

        # Tile indexing
        self._tile_index = np.vstack(
            np.meshgrid(*[np.arange(0, x) for x in self._indexing_shape], indexing="ij")
        )
        self._tile_index = self._tile_index.reshape(self._n_dim, -1).T
        self.n_tiles = len(self._tile_index)

        if self.n_tiles == 0:
            warnings.warn(
                f"Tiler (mode={mode}, overlap={overlap}) will split data_shape {data_shape} "
                f"into zero tiles (tile_shape={tile_shape})."
            )

    def __len__(self) -> int:
        """
        Returns:
             int: Number of tiles in the mosaic.
        """
        return self.n_tiles

    def __repr__(self) -> str:
        """
        Returns:
            str: String representation of the object.
        """
        return (
            f"Tiler split {list(self.data_shape)} data into {len(self)} tiles of {list(self.tile_shape)}."
            f"\n\tMosaic shape: {list(self._indexing_shape)}"
            f"\n\tPadded shape: {list(self._new_shape)}"
            f"\n\tTile overlap: {self.overlap}"
            f"\n\tElement step: {list(self._tile_step)}"
            f"\n\tMode: {self.mode}"
            f"\n\tChannel dimension: {self.channel_dimension}"
        )

    def __call__(
        self,
        data: Union[np.ndarray, Callable[..., np.ndarray]],
        progress_bar: bool = False,
        batch_size: int = 0,
        drop_last: bool = False,
        copy_data: bool = True,
    ) -> Generator[Tuple[int, np.ndarray], None, None]:
        """Alias for `Tiler.iterate()`"""
        return self.iterate(data, progress_bar, batch_size, drop_last, copy_data)

    def iterate(
        self,
        data: Union[np.ndarray, Callable[..., np.ndarray]],
        progress_bar: bool = False,
        batch_size: int = 0,
        drop_last: bool = False,
        copy_data: bool = True,
    ) -> Generator[Tuple[int, np.ndarray], None, None]:
        """Iterates through tiles of the given data array. This method can also be accessed by `Tiler.__call__()`.

        Args:
            data (np.ndarray or callable): The data array on which the tiling will be performed. A callable can be
                supplied to load data into memory instead of slicing from an array. The callable should take integers
                as input, the smallest tile corner coordinates and tile size in each dimension, and output numpy array.

                e.g.
                *python-bioformats*
                ```python
                >>> tileSize = 2000
                >>> tiler = Tiler((sizeX, sizeY, sizeC), (tileSize, tileSize, sizeC))
                >>> def reader_func(*args):
                >>>     X, Y, W, H = args[0], args[1], args[3], args[4]
                >>>     return reader.read(XYWH=[X, Y, W, H])
                >>> for t_id, tile in tiler.iterate(reader_func):
                >>>     pass
                ```
                *open-slide*
                ```python
                >>> reader_func = lambda *args: wsi.read_region([args[0], args[1]], 0, [args[3], args[4]])
                >>> for t_id, tile in tiler.iterate(reader_func):
                >>>     pass
                ```

            progress_bar (bool): Specifies whether to show the progress bar or not.
                Uses `tqdm` package.
                Default is `False`.

            batch_size (int): Specifies returned batch size.
                If `batch_size == 0`, return one tile at a time.
                If `batch_size >= 1`, return in batches (returned shape: `[batch_size, *tile_shape]`).
                Default is 0.

            drop_last (bool): Specifies whether to drop last non-full batch.
                Used only when batch_size > 0.
                Default is False.

            copy_data (bool): Specifies whether to copy the tile before returning it.
                If `copy_data == False`, returns a view.
                Default is True.

        Yields:
            (int, np.ndarray): Tuple with integer tile number and array tile data.
        """

        if batch_size < 0:
            raise ValueError(f"Batch size must >= 0, not {batch_size}")

        # return a tile at a time
        if batch_size == 0:
            for tile_i in tqdm(
                range(self.n_tiles), disable=not progress_bar, unit=" tiles"
            ):
                yield tile_i, self.get_tile(data, tile_i, copy_data=copy_data)

        # return in batches
        if batch_size > 0:
            # check for drop_last
            length = (
                (self.n_tiles - (self.n_tiles % batch_size))
                if drop_last
                else self.n_tiles
            )

            for tile_i in tqdm(
                range(0, length, batch_size), disable=not progress_bar, unit=" batches"
            ):
                tiles = np.stack(
                    [
                        self.get_tile(data, x, copy_data=copy_data)
                        for x in range(tile_i, min(tile_i + batch_size, length))
                    ]
                )
                yield tile_i // batch_size, tiles

    def get_tile(
        self,
        data: Union[np.ndarray, Callable[..., np.ndarray]],
        tile_id: int,
        copy_data: bool = True,
    ) -> np.ndarray:
        """Returns an individual tile.

        Args:
            data (np.ndarray or callable): Data from which `tile_id`-th tile will be taken. A callable can be
                supplied to load data into memory instead of slicing from an array. The callable should take integers
                as input, the smallest tile corner coordinates and tile size in each dimension, and output numpy array.

                e.g.
                *python-bioformats*
                ```python
                >>> tileSize = 2000
                >>> tiler = Tiler((sizeX, sizeY, sizeC), (tileSize, tileSize, sizeC))
                >>> def reader_func(*args):
                >>>     X, Y, W, H = args[0], args[1], args[3], args[4]
                >>>     return reader.read(XYWH=[X, Y, W, H])
                >>> tiler.get_tile(reader_func, 0)
                ```
                *open-slide*
                ```python
                >>> reader_func = lambda *args: wsi.read_region([args[0], args[1]], 0, [args[3], args[4]])
                >>> tiler.get_tile(reader_func, 0)
                ```

            tile_id (int): Specifies which tile to return. Must be smaller than the total number of tiles.

            copy_data (bool): Specifies whether returned tile is a copy.
                If `copy_data == False` returns a view.
                Default is True.

        Returns:
            np.ndarray: Content of tile number `tile_id`, padded if necessary.
        """

        if (tile_id < 0) or (tile_id >= self.n_tiles):
            raise IndexError(
                f"Out of bounds, there is no tile {tile_id}."
                f"There are {len(self) - 1} tiles, starting from index 0."
            )

        if (
            isinstance(data, np.ndarray)
            and np.not_equal(data.shape, self.data_shape).any()
        ):
            raise ValueError(
                f"Shape of provided data array ({data.shape}) does not match "
                f"same as Tiler's data_shape ({tuple(self.data_shape)})."
            )

        # get tile data
        tile_corner = self._tile_index[tile_id] * self._tile_step
        # take the lesser of the tile shape and the distance to the edge
        sampling = [
            slice(
                tile_corner[d],
                np.min([self.data_shape[d], tile_corner[d] + self.tile_shape[d]]),
            )
            for d in range(self._n_dim)
        ]

        if callable(data):
            sampling = [x.stop - x.start for x in sampling]
            tile_data = data(*tile_corner, *sampling)
        else:
            tile_data = data[tuple(sampling)]

        if copy_data:
            tile_data = tile_data.copy()

        shape_diff = self.tile_shape - tile_data.shape
        if (self.mode != "irregular") and np.any(shape_diff > 0):
            if self.mode == "constant":
                tile_data = np.pad(
                    tile_data,
                    list((0, diff) for diff in shape_diff),
                    mode=self.mode,
                    constant_values=self.constant_value,
                )
            elif self.mode == "reflect" or self.mode == "edge" or self.mode == "wrap":
                tile_data = np.pad(
                    tile_data, list((0, diff) for diff in shape_diff), mode=self.mode
                )

        return tile_data

    def get_all_tiles(
        self,
        data: Union[np.ndarray, Callable[..., np.ndarray]],
        axis: int = 0,
        copy_data: bool = True,
    ) -> np.ndarray:
        """Returns all tiles joined along a new axis. Does not work for `Tiler.mode = 'irregular'`.

        The `axis` parameter specifies the index of the new axis in the dimensions of the result.
        For example, if `axis=0` it will be the first dimension and if `axis=-1` it will be the last dimension.

        For more information about `data` and `copy_data` parameters, see `Tiler.get_tile()`.

        Args:
            data (np.ndarray or callable): Data which will be tiled. A callable can be supplied to load data into memory
                instead of slicing from an array. The callable should take integers as input, the smallest tile corner
                coordinates and tile size in each dimension, and output numpy array.

            axis (int): The axis in the result array along which the tiles are stacked.

            copy_data (bool): Specifies whether returned tile is a copy.
                If `copy_data == False` returns a view.
                Default is True.

        Returns:
            np.ndarray: All tiles stacked along a new axis.
        """

        if self.mode == "irregular":
            raise ValueError("get_all_tiles does not support irregular mode")

        return np.stack(
            [self.get_tile(data, x, copy_data=copy_data) for x in range(self.n_tiles)],
            axis=axis,
        )

    def get_tile_bbox(
        self,
        tile_id: int,
        with_channel_dim: bool = False,
        all_corners: bool = False,
    ) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
        """Returns coordinates of the opposite corners of the bounding box (hyperrectangle?) of the tile on padded data.

        Args:
            tile_id (int): Specifies which tile's bounding coordinates will be returned.
                Must be between 0 and the total number of tiles.

            with_channel_dim (bool): Specifies whether to return shape with channel dimension or without.
                Default is False.

            all_corners (bool): If True, returns all vertices of the bounding box.
                Default is False.

        Returns:
            (np.ndarray, np.ndarray): Smallest (bottom-left) and largest (top-right) corners of the bounding box.

            np.ndarray: All corners of the bounding box, if `all_corners=True`.
        """

        if (tile_id < 0) or (tile_id >= self.n_tiles):
            raise IndexError(
                f"Out of bounds, there is no tile {tile_id}. "
                f"There are {len(self) - 1} tiles, starting from index 0."
            )

        # find min and max vertices
        bottom_left_corner = self._tile_step * self.get_tile_mosaic_position(
            tile_id, True
        )
        top_right_corner = bottom_left_corner + self.tile_shape

        # remove channel dimension if not required
        if self.channel_dimension is not None and not with_channel_dim:
            dim_indices = list(range(self.channel_dimension)) + list(
                range(self.channel_dimension + 1, len(self._tile_step))
            )
            bottom_left_corner = bottom_left_corner[dim_indices]
            top_right_corner = top_right_corner[dim_indices]

        # by default, return only min/max vertices
        if not all_corners:
            return bottom_left_corner, top_right_corner

        # otherwise, return all vertices of the bbox
        # inspired by https://stackoverflow.com/a/57065356/1668421
        # but instead create an indexing array from cartesian product of bits
        # and use it to sample intervals
        else:
            n_dim: int = len(bottom_left_corner)  # already channel_dimension adjusted
            mins = np.minimum(bottom_left_corner, top_right_corner)
            maxs = np.maximum(bottom_left_corner, top_right_corner)
            intervals = np.stack([mins, maxs], -1)
            indexing = np.array(list(itertools.product([0, 1], repeat=n_dim)))
            corners = np.stack([intervals[x][indexing.T[x]] for x in range(n_dim)], -1)
            return corners

    def get_tile_mosaic_position(
        self, tile_id: int, with_channel_dim: bool = False
    ) -> np.ndarray:
        """Returns tile position in the mosaic.

        Args:
          tile_id (int): Specifies which tile's mosaic position will be returned. \
            Must be smaller than the total number of tiles.

          with_channel_dim (bool): Specifies whether to return position with channel dimension or without.
            Default is False.

        Returns:
            np.ndarray: Tile mosaic position (tile position relative to other tiles).
        """
        if (tile_id < 0) or (tile_id >= self.n_tiles):
            raise IndexError(
                f"Out of bounds, there is no tile {tile_id}. "
                f"There are {len(self) - 1} tiles, starting from index 0."
            )

        if self.channel_dimension is not None and not with_channel_dim:
            return self._tile_index[tile_id][
                ~(np.arange(self._n_dim) == self.channel_dimension)
            ]
        return self._tile_index[tile_id]

    def get_mosaic_shape(self, with_channel_dim: bool = False) -> np.ndarray:
        """Returns mosaic shape.

        Args:
            with_channel_dim (bool):
                Specifies whether to return shape with channel dimension or without. Defaults to False.

        Returns:
            np.ndarray: Shape of tiles mosaic.
        """
        if self.channel_dimension is not None and not with_channel_dim:
            return self._indexing_shape[
                ~(np.arange(self._n_dim) == self.channel_dimension)
            ]
        return self._indexing_shape

    def calculate_padding(self) -> Tuple[np.ndarray, List[Tuple[int, int]]]:
        """Calculate a frame padding for the current Tiler parameters.
        The padding is overlap//2 or tile_step//2, whichever is bigger.
        The method returns a tuple (new_shape, padding) where padding is
        ((before_1, after_1), … (before_N, after_N)), unique pad widths for each axis N.

        In the usual workflow, you'd recalculate tiling settings and then apply padding, prior to tiling.
        Then when merging, pass padding to `Merger.merge(extra_padding=padding, ...)`:
        ```python
        >>> tiler = Tiler(...)
        >>> merger = Merger(tiler, ...)
        >>> new_shape, padding = tiler.calculate_padding()
        >>> tiler.recalculate(data_shape=new_shape)
        >>> padded_data = np.pad(data, pad_width=padding, mode="reflect")
        >>> for tile_id, tile in tiler(padded_data):
        >>>     processed_tile = process(tile)
        >>>     merger.add(tile_id, processed_tile)
        >>> final_image = merger.merge(extra_padding=padding)
        ```
        Return:
            np.ndarray: Resulting shape when padding is applied.

            List[Tuple[int, int]]: Calculated padding.
        """

        # Choosing padding
        pre_pad = np.maximum(self._tile_step // 2, self._tile_overlap // 2)
        post_pad = pre_pad + np.mod(self._tile_step, 2)

        new_shape = pre_pad + self.data_shape + post_pad
        padding = list(zip(pre_pad, post_pad))

        return new_shape, padding
#   Tiler( data_shape: Union[Tuple, List, numpy.ndarray], tile_shape: Union[Tuple, List, numpy.ndarray], overlap: Union[int, float, Tuple, List, numpy.ndarray] = 0, channel_dimension: Union[int, NoneType] = None, mode: str = 'constant', constant_value: float = 0.0 )
View Source
    def __init__(
        self,
        data_shape: Union[Tuple, List, np.ndarray],
        tile_shape: Union[Tuple, List, np.ndarray],
        overlap: Union[int, float, Tuple, List, np.ndarray] = 0,
        channel_dimension: Optional[int] = None,
        mode: str = "constant",
        constant_value: float = 0.0,
    ):
        """Tiler class precomputes everything for tiling with specified parameters, without actually slicing data.
        You can access tiles individually with `Tiler.get_tile()` or with an iterator, both individually and in batches,
        with `Tiler.iterate()` (or the alias `Tiler.__call__()`).

        Args:
            data_shape (tuple, list or np.ndarray): Input data shape, e.g. (1920, 1080, 3), [512, 512, 512] or np.ndarray([3, 1024, 768]).
                If there is a channel dimension, it should be included in the shape.

            tile_shape (tuple, list or np.ndarray): Shape of a tile, e.g. (256, 256, 3), [64, 64, 64] or np.ndarray([3, 128, 128]).
                Tile must have the same number of dimensions as data or less.
                If less, the shape will be automatically prepended with ones to match data_shape size.

            overlap (int, float, tuple, list or np.ndarray): Specifies overlap between tiles.
                If integer, the same overlap of overlap pixels applied in each dimension, except channel_dimension.
                If float, percentage of a tile_shape to overlap (from 0.0 to 1.0), except channel_dimension.
                If tuple, list or np.ndarray, explicit size of the overlap (must be smaller than tile_shape in each dimension).
                Default is `0`.

            channel_dimension (int, optional): Specifies which axis is the channel dimension that will not be tiled.
                Usually it is the last or the first dimension of the array.
                Negative indexing (`-len(data_shape)` to `-1` inclusive) is allowed.
                Default is `None`, no channel dimension in the data.

            mode (str): Defines how the data will be tiled.
                Must be one of the supported `Tiler.TILING_MODES`. Defaults to `"constant"`.

            constant_value (float): Specifies the value of padding when `mode='constant'`.
                Default is `0.0`.
        """

        self.recalculate(
            data_shape=data_shape,
            tile_shape=tile_shape,
            overlap=overlap,
            channel_dimension=channel_dimension,
            mode=mode,
            constant_value=constant_value,
        )

Tiler class precomputes everything for tiling with specified parameters, without actually slicing data. You can access tiles individually with Tiler.get_tile() or with an iterator, both individually and in batches, with Tiler.iterate() (or the alias Tiler.__call__()).

Args
  • data_shape (tuple, list or np.ndarray): Input data shape, e.g. (1920, 1080, 3), [512, 512, 512] or np.ndarray([3, 1024, 768]). If there is a channel dimension, it should be included in the shape.
  • tile_shape (tuple, list or np.ndarray): Shape of a tile, e.g. (256, 256, 3), [64, 64, 64] or np.ndarray([3, 128, 128]). Tile must have the same number of dimensions as data or less. If less, the shape will be automatically prepended with ones to match data_shape size.
  • overlap (int, float, tuple, list or np.ndarray): Specifies overlap between tiles. If integer, the same overlap of overlap pixels applied in each dimension, except channel_dimension. If float, percentage of a tile_shape to overlap (from 0.0 to 1.0), except channel_dimension. If tuple, list or np.ndarray, explicit size of the overlap (must be smaller than tile_shape in each dimension). Default is 0.
  • channel_dimension (int, optional): Specifies which axis is the channel dimension that will not be tiled. Usually it is the last or the first dimension of the array. Negative indexing (-len(data_shape) to -1 inclusive) is allowed. Default is None, no channel dimension in the data.
  • mode (str): Defines how the data will be tiled. Must be one of the supported Tiler.TILING_MODES. Defaults to "constant".
  • constant_value (float): Specifies the value of padding when mode='constant'. Default is 0.0.
#   TILING_MODES = ['constant', 'drop', 'irregular', 'reflect', 'edge', 'wrap']

Supported tiling modes:

  • constant (default)
    If a tile is smaller than tile_shape, pad it with the constant value along each axis to match tile_shape. Set the value with the keyword constant_value.
  • drop
    If a tile is smaller than tile_shape in any of the dimensions, ignore it. Can result in zero tiles.
  • irregular
    Allow tiles to be smaller than tile_shape.
  • reflect
    If a tile is smaller than tile_shape, pad it with the reflection of values along each axis to match tile_shape.
  • edge
    If a tile is smaller than tile_shape, pad it with the edge values of data along each axis to match tile_shape.
  • wrap
    If a tile is smaller than tile_shape, pad it with the wrap of the vector along each axis to match tile_shape. The first values are used to pad the end and the end values are used to pad the beginning.
#   def recalculate( self, data_shape: Union[Tuple, List, numpy.ndarray, NoneType] = None, tile_shape: Union[Tuple, List, numpy.ndarray, NoneType] = None, overlap: Union[int, float, Tuple, List, numpy.ndarray, NoneType] = None, channel_dimension: Union[int, NoneType] = None, mode: Union[str, NoneType] = None, constant_value: Union[float, NoneType] = None ) -> None:
View Source
    def recalculate(
        self,
        data_shape: Optional[Union[Tuple, List, np.ndarray]] = None,
        tile_shape: Optional[Union[Tuple, List, np.ndarray]] = None,
        overlap: Optional[Union[int, float, Tuple, List, np.ndarray]] = None,
        channel_dimension: Optional[int] = None,
        mode: Optional[str] = None,
        constant_value: Optional[float] = None,
    ) -> None:
        """Recalculates tiling for new given settings.
        If a passed value is None, use previously given value.

        For more information about each argument see `Tiler.__init__()` documentation.
        """

        # Data and tile shapes
        if data_shape is not None:
            self.data_shape = np.asarray(data_shape, dtype=np.int64)
        if tile_shape is not None:
            self.tile_shape = np.atleast_1d(np.asarray(tile_shape, dtype=np.int64))

            # Append ones to match data_shape size
            if self.tile_shape.size <= self.data_shape.size:
                size_difference = self.data_shape.size - self.tile_shape.size
                self.tile_shape = np.insert(
                    arr=self.tile_shape, obj=0, values=np.ones(size_difference), axis=0
                )
                warnings.warn(
                    f"Tiler automatically adjusted tile_shape from {tuple(tile_shape)} to {tuple(self.tile_shape)}."
                )
        self._n_dim: int = len(self.data_shape)
        if (self.tile_shape <= 0).any() or (self.data_shape <= 0).any():
            raise ValueError(
                "Tile and data shapes must be tuple, list or ndarray of positive integers."
            )
        if self.tile_shape.size != self.data_shape.size:
            raise ValueError(
                "Tile shape must have less or equal number of elements compared to the data shape. "
                "If less, your tile shape will be prepended with ones to match the data shape, "
                "e.g. data_shape=(28, 28), tile_shape=(28) -> tile_shape=(1, 28)."
            )

        # Tiling mode
        if mode is not None:
            self.mode = mode
        if self.mode not in self.TILING_MODES:
            raise ValueError(
                f"{self.mode} is an unsupported tiling mode, please check the documentation."
            )

        # Constant value used for constant tiling mode
        if constant_value is not None:
            self.constant_value = constant_value

        # Channel dimension
        # Channel dimension can be None which means we need to check for init too
        if not hasattr(self, "channel_dimension") or channel_dimension is not None:
            self.channel_dimension = channel_dimension
        if self.channel_dimension:
            if (self.channel_dimension >= self._n_dim) or (
                self.channel_dimension < -self._n_dim
            ):
                raise ValueError(
                    f"Specified channel dimension is out of bounds "
                    f"(should be None or an integer from {-self._n_dim} to {self._n_dim - 1})."
                )
            if self.channel_dimension < 0:
                # negative indexing
                self.channel_dimension = self._n_dim + self.channel_dimension

        # Overlap and step
        if overlap is not None:
            self.overlap = overlap
        if isinstance(self.overlap, float):
            if self.overlap < 0 or self.overlap > 1.0:
                raise ValueError(
                    "Float overlap must be in range of 0.0 (0%) to 1.0 (100%)."
                )

            self._tile_overlap: np.ndarray = np.ceil(
                self.overlap * self.tile_shape
            ).astype(int)
            if self.channel_dimension is not None:
                self._tile_overlap[self.channel_dimension] = 0

        elif isinstance(self.overlap, int):
            tile_shape_without_channel = self.tile_shape[
                np.arange(self._n_dim) != self.channel_dimension
            ]
            if self.overlap < 0 or np.any(self.overlap >= tile_shape_without_channel):
                raise ValueError(
                    f"Integer overlap must be in range of 0 to {np.max(tile_shape_without_channel)}"
                )

            self._tile_overlap: np.ndarray = np.array(
                [self.overlap for _ in self.tile_shape]
            )
            if self.channel_dimension is not None:
                self._tile_overlap[self.channel_dimension] = 0

        elif (
            isinstance(self.overlap, list)
            or isinstance(self.overlap, tuple)
            or isinstance(self.overlap, np.ndarray)
        ):
            if np.any(np.array(self.overlap) < 0) or np.any(
                self.overlap >= self.tile_shape
            ):
                raise ValueError("Overlap size must be smaller than tile_shape.")

            self._tile_overlap: np.ndarray = np.array(self.overlap).astype(int)

        else:
            raise ValueError(
                "Unsupported overlap mode (not float, int, list, tuple or np.ndarray)."
            )

        self._tile_step: np.ndarray = (self.tile_shape - self._tile_overlap).astype(
            int
        )  # tile step

        # Calculate mosaic (collection of tiles) shape
        div, mod = np.divmod(
            [self.data_shape[d] - self._tile_overlap[d] for d in range(self._n_dim)],
            self._tile_step,
        )
        if self.mode == "drop":
            self._indexing_shape = div
        else:
            self._indexing_shape = div + (mod != 0)
        if self.channel_dimension is not None:
            self._indexing_shape[self.channel_dimension] = 1

        # Calculate new shape assuming tiles are padded
        if self.mode == "irregular":
            self._new_shape = self.data_shape
        else:
            self._new_shape = (
                self._indexing_shape * self._tile_step
            ) + self._tile_overlap
        self._shape_diff = self._new_shape - self.data_shape
        if self.channel_dimension is not None:
            self._shape_diff[self.channel_dimension] = 0

        # If channel dimension is given, set tile_step of that dimension to 0
        if self.channel_dimension is not None:
            self._tile_step[self.channel_dimension] = 0

        # Tile indexing
        self._tile_index = np.vstack(
            np.meshgrid(*[np.arange(0, x) for x in self._indexing_shape], indexing="ij")
        )
        self._tile_index = self._tile_index.reshape(self._n_dim, -1).T
        self.n_tiles = len(self._tile_index)

        if self.n_tiles == 0:
            warnings.warn(
                f"Tiler (mode={mode}, overlap={overlap}) will split data_shape {data_shape} "
                f"into zero tiles (tile_shape={tile_shape})."
            )

Recalculates tiling for new given settings. If a passed value is None, use previously given value.

For more information about each argument see Tiler.__init__() documentation.

#   def iterate( self, data: Union[numpy.ndarray, Callable[..., numpy.ndarray]], progress_bar: bool = False, batch_size: int = 0, drop_last: bool = False, copy_data: bool = True ) -> Generator[Tuple[int, numpy.ndarray], NoneType, NoneType]:
View Source
    def iterate(
        self,
        data: Union[np.ndarray, Callable[..., np.ndarray]],
        progress_bar: bool = False,
        batch_size: int = 0,
        drop_last: bool = False,
        copy_data: bool = True,
    ) -> Generator[Tuple[int, np.ndarray], None, None]:
        """Iterates through tiles of the given data array. This method can also be accessed by `Tiler.__call__()`.

        Args:
            data (np.ndarray or callable): The data array on which the tiling will be performed. A callable can be
                supplied to load data into memory instead of slicing from an array. The callable should take integers
                as input, the smallest tile corner coordinates and tile size in each dimension, and output numpy array.

                e.g.
                *python-bioformats*
                ```python
                >>> tileSize = 2000
                >>> tiler = Tiler((sizeX, sizeY, sizeC), (tileSize, tileSize, sizeC))
                >>> def reader_func(*args):
                >>>     X, Y, W, H = args[0], args[1], args[3], args[4]
                >>>     return reader.read(XYWH=[X, Y, W, H])
                >>> for t_id, tile in tiler.iterate(reader_func):
                >>>     pass
                ```
                *open-slide*
                ```python
                >>> reader_func = lambda *args: wsi.read_region([args[0], args[1]], 0, [args[3], args[4]])
                >>> for t_id, tile in tiler.iterate(reader_func):
                >>>     pass
                ```

            progress_bar (bool): Specifies whether to show the progress bar or not.
                Uses `tqdm` package.
                Default is `False`.

            batch_size (int): Specifies returned batch size.
                If `batch_size == 0`, return one tile at a time.
                If `batch_size >= 1`, return in batches (returned shape: `[batch_size, *tile_shape]`).
                Default is 0.

            drop_last (bool): Specifies whether to drop last non-full batch.
                Used only when batch_size > 0.
                Default is False.

            copy_data (bool): Specifies whether to copy the tile before returning it.
                If `copy_data == False`, returns a view.
                Default is True.

        Yields:
            (int, np.ndarray): Tuple with integer tile number and array tile data.
        """

        if batch_size < 0:
            raise ValueError(f"Batch size must >= 0, not {batch_size}")

        # return a tile at a time
        if batch_size == 0:
            for tile_i in tqdm(
                range(self.n_tiles), disable=not progress_bar, unit=" tiles"
            ):
                yield tile_i, self.get_tile(data, tile_i, copy_data=copy_data)

        # return in batches
        if batch_size > 0:
            # check for drop_last
            length = (
                (self.n_tiles - (self.n_tiles % batch_size))
                if drop_last
                else self.n_tiles
            )

            for tile_i in tqdm(
                range(0, length, batch_size), disable=not progress_bar, unit=" batches"
            ):
                tiles = np.stack(
                    [
                        self.get_tile(data, x, copy_data=copy_data)
                        for x in range(tile_i, min(tile_i + batch_size, length))
                    ]
                )
                yield tile_i // batch_size, tiles

Iterates through tiles of the given data array. This method can also be accessed by Tiler.__call__().

Args
  • data (np.ndarray or callable): The data array on which the tiling will be performed. A callable can be supplied to load data into memory instead of slicing from an array. The callable should take integers as input, the smallest tile corner coordinates and tile size in each dimension, and output numpy array.

    e.g. python-bioformats

    >>> tileSize = 2000
    >>> tiler = Tiler((sizeX, sizeY, sizeC), (tileSize, tileSize, sizeC))
    >>> def reader_func(*args):
    >>>     X, Y, W, H = args[0], args[1], args[3], args[4]
    >>>     return reader.read(XYWH=[X, Y, W, H])
    >>> for t_id, tile in tiler.iterate(reader_func):
    >>>     pass
    

    open-slide

    >>> reader_func = lambda *args: wsi.read_region([args[0], args[1]], 0, [args[3], args[4]])
    >>> for t_id, tile in tiler.iterate(reader_func):
    >>>     pass
    
  • progress_bar (bool): Specifies whether to show the progress bar or not. Uses tqdm package. Default is False.
  • batch_size (int): Specifies returned batch size. If batch_size == 0, return one tile at a time. If batch_size >= 1, return in batches (returned shape: [batch_size, *tile_shape]). Default is 0.
  • drop_last (bool): Specifies whether to drop last non-full batch. Used only when batch_size > 0. Default is False.
  • copy_data (bool): Specifies whether to copy the tile before returning it. If copy_data == False, returns a view. Default is True.
Yields

(int, np.ndarray): Tuple with integer tile number and array tile data.

#   def get_tile( self, data: Union[numpy.ndarray, Callable[..., numpy.ndarray]], tile_id: int, copy_data: bool = True ) -> numpy.ndarray:
View Source
    def get_tile(
        self,
        data: Union[np.ndarray, Callable[..., np.ndarray]],
        tile_id: int,
        copy_data: bool = True,
    ) -> np.ndarray:
        """Returns an individual tile.

        Args:
            data (np.ndarray or callable): Data from which `tile_id`-th tile will be taken. A callable can be
                supplied to load data into memory instead of slicing from an array. The callable should take integers
                as input, the smallest tile corner coordinates and tile size in each dimension, and output numpy array.

                e.g.
                *python-bioformats*
                ```python
                >>> tileSize = 2000
                >>> tiler = Tiler((sizeX, sizeY, sizeC), (tileSize, tileSize, sizeC))
                >>> def reader_func(*args):
                >>>     X, Y, W, H = args[0], args[1], args[3], args[4]
                >>>     return reader.read(XYWH=[X, Y, W, H])
                >>> tiler.get_tile(reader_func, 0)
                ```
                *open-slide*
                ```python
                >>> reader_func = lambda *args: wsi.read_region([args[0], args[1]], 0, [args[3], args[4]])
                >>> tiler.get_tile(reader_func, 0)
                ```

            tile_id (int): Specifies which tile to return. Must be smaller than the total number of tiles.

            copy_data (bool): Specifies whether returned tile is a copy.
                If `copy_data == False` returns a view.
                Default is True.

        Returns:
            np.ndarray: Content of tile number `tile_id`, padded if necessary.
        """

        if (tile_id < 0) or (tile_id >= self.n_tiles):
            raise IndexError(
                f"Out of bounds, there is no tile {tile_id}."
                f"There are {len(self) - 1} tiles, starting from index 0."
            )

        if (
            isinstance(data, np.ndarray)
            and np.not_equal(data.shape, self.data_shape).any()
        ):
            raise ValueError(
                f"Shape of provided data array ({data.shape}) does not match "
                f"same as Tiler's data_shape ({tuple(self.data_shape)})."
            )

        # get tile data
        tile_corner = self._tile_index[tile_id] * self._tile_step
        # take the lesser of the tile shape and the distance to the edge
        sampling = [
            slice(
                tile_corner[d],
                np.min([self.data_shape[d], tile_corner[d] + self.tile_shape[d]]),
            )
            for d in range(self._n_dim)
        ]

        if callable(data):
            sampling = [x.stop - x.start for x in sampling]
            tile_data = data(*tile_corner, *sampling)
        else:
            tile_data = data[tuple(sampling)]

        if copy_data:
            tile_data = tile_data.copy()

        shape_diff = self.tile_shape - tile_data.shape
        if (self.mode != "irregular") and np.any(shape_diff > 0):
            if self.mode == "constant":
                tile_data = np.pad(
                    tile_data,
                    list((0, diff) for diff in shape_diff),
                    mode=self.mode,
                    constant_values=self.constant_value,
                )
            elif self.mode == "reflect" or self.mode == "edge" or self.mode == "wrap":
                tile_data = np.pad(
                    tile_data, list((0, diff) for diff in shape_diff), mode=self.mode
                )

        return tile_data

Returns an individual tile.

Args
  • data (np.ndarray or callable): Data from which tile_id-th tile will be taken. A callable can be supplied to load data into memory instead of slicing from an array. The callable should take integers as input, the smallest tile corner coordinates and tile size in each dimension, and output numpy array.

    e.g. python-bioformats

    >>> tileSize = 2000
    >>> tiler = Tiler((sizeX, sizeY, sizeC), (tileSize, tileSize, sizeC))
    >>> def reader_func(*args):
    >>>     X, Y, W, H = args[0], args[1], args[3], args[4]
    >>>     return reader.read(XYWH=[X, Y, W, H])
    >>> tiler.get_tile(reader_func, 0)
    

    open-slide

    >>> reader_func = lambda *args: wsi.read_region([args[0], args[1]], 0, [args[3], args[4]])
    >>> tiler.get_tile(reader_func, 0)
    
  • tile_id (int): Specifies which tile to return. Must be smaller than the total number of tiles.
  • copy_data (bool): Specifies whether returned tile is a copy. If copy_data == False returns a view. Default is True.
Returns

np.ndarray: Content of tile number tile_id, padded if necessary.

#   def get_all_tiles( self, data: Union[numpy.ndarray, Callable[..., numpy.ndarray]], axis: int = 0, copy_data: bool = True ) -> numpy.ndarray:
View Source
    def get_all_tiles(
        self,
        data: Union[np.ndarray, Callable[..., np.ndarray]],
        axis: int = 0,
        copy_data: bool = True,
    ) -> np.ndarray:
        """Returns all tiles joined along a new axis. Does not work for `Tiler.mode = 'irregular'`.

        The `axis` parameter specifies the index of the new axis in the dimensions of the result.
        For example, if `axis=0` it will be the first dimension and if `axis=-1` it will be the last dimension.

        For more information about `data` and `copy_data` parameters, see `Tiler.get_tile()`.

        Args:
            data (np.ndarray or callable): Data which will be tiled. A callable can be supplied to load data into memory
                instead of slicing from an array. The callable should take integers as input, the smallest tile corner
                coordinates and tile size in each dimension, and output numpy array.

            axis (int): The axis in the result array along which the tiles are stacked.

            copy_data (bool): Specifies whether returned tile is a copy.
                If `copy_data == False` returns a view.
                Default is True.

        Returns:
            np.ndarray: All tiles stacked along a new axis.
        """

        if self.mode == "irregular":
            raise ValueError("get_all_tiles does not support irregular mode")

        return np.stack(
            [self.get_tile(data, x, copy_data=copy_data) for x in range(self.n_tiles)],
            axis=axis,
        )

Returns all tiles joined along a new axis. Does not work for Tiler.mode = 'irregular'.

The axis parameter specifies the index of the new axis in the dimensions of the result. For example, if axis=0 it will be the first dimension and if axis=-1 it will be the last dimension.

For more information about data and copy_data parameters, see Tiler.get_tile().

Args
  • data (np.ndarray or callable): Data which will be tiled. A callable can be supplied to load data into memory instead of slicing from an array. The callable should take integers as input, the smallest tile corner coordinates and tile size in each dimension, and output numpy array.
  • axis (int): The axis in the result array along which the tiles are stacked.
  • copy_data (bool): Specifies whether returned tile is a copy. If copy_data == False returns a view. Default is True.
Returns

np.ndarray: All tiles stacked along a new axis.

#   def get_tile_bbox( self, tile_id: int, with_channel_dim: bool = False, all_corners: bool = False ) -> Union[Tuple[numpy.ndarray, numpy.ndarray], numpy.ndarray]:
View Source
    def get_tile_bbox(
        self,
        tile_id: int,
        with_channel_dim: bool = False,
        all_corners: bool = False,
    ) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
        """Returns coordinates of the opposite corners of the bounding box (hyperrectangle?) of the tile on padded data.

        Args:
            tile_id (int): Specifies which tile's bounding coordinates will be returned.
                Must be between 0 and the total number of tiles.

            with_channel_dim (bool): Specifies whether to return shape with channel dimension or without.
                Default is False.

            all_corners (bool): If True, returns all vertices of the bounding box.
                Default is False.

        Returns:
            (np.ndarray, np.ndarray): Smallest (bottom-left) and largest (top-right) corners of the bounding box.

            np.ndarray: All corners of the bounding box, if `all_corners=True`.
        """

        if (tile_id < 0) or (tile_id >= self.n_tiles):
            raise IndexError(
                f"Out of bounds, there is no tile {tile_id}. "
                f"There are {len(self) - 1} tiles, starting from index 0."
            )

        # find min and max vertices
        bottom_left_corner = self._tile_step * self.get_tile_mosaic_position(
            tile_id, True
        )
        top_right_corner = bottom_left_corner + self.tile_shape

        # remove channel dimension if not required
        if self.channel_dimension is not None and not with_channel_dim:
            dim_indices = list(range(self.channel_dimension)) + list(
                range(self.channel_dimension + 1, len(self._tile_step))
            )
            bottom_left_corner = bottom_left_corner[dim_indices]
            top_right_corner = top_right_corner[dim_indices]

        # by default, return only min/max vertices
        if not all_corners:
            return bottom_left_corner, top_right_corner

        # otherwise, return all vertices of the bbox
        # inspired by https://stackoverflow.com/a/57065356/1668421
        # but instead create an indexing array from cartesian product of bits
        # and use it to sample intervals
        else:
            n_dim: int = len(bottom_left_corner)  # already channel_dimension adjusted
            mins = np.minimum(bottom_left_corner, top_right_corner)
            maxs = np.maximum(bottom_left_corner, top_right_corner)
            intervals = np.stack([mins, maxs], -1)
            indexing = np.array(list(itertools.product([0, 1], repeat=n_dim)))
            corners = np.stack([intervals[x][indexing.T[x]] for x in range(n_dim)], -1)
            return corners

Returns coordinates of the opposite corners of the bounding box (hyperrectangle?) of the tile on padded data.

Args
  • tile_id (int): Specifies which tile's bounding coordinates will be returned. Must be between 0 and the total number of tiles.
  • with_channel_dim (bool): Specifies whether to return shape with channel dimension or without. Default is False.
  • all_corners (bool): If True, returns all vertices of the bounding box. Default is False.
Returns

(np.ndarray, np.ndarray): Smallest (bottom-left) and largest (top-right) corners of the bounding box.

np.ndarray: All corners of the bounding box, if all_corners=True.

#   def get_tile_mosaic_position(self, tile_id: int, with_channel_dim: bool = False) -> numpy.ndarray:
View Source
    def get_tile_mosaic_position(
        self, tile_id: int, with_channel_dim: bool = False
    ) -> np.ndarray:
        """Returns tile position in the mosaic.

        Args:
          tile_id (int): Specifies which tile's mosaic position will be returned. \
            Must be smaller than the total number of tiles.

          with_channel_dim (bool): Specifies whether to return position with channel dimension or without.
            Default is False.

        Returns:
            np.ndarray: Tile mosaic position (tile position relative to other tiles).
        """
        if (tile_id < 0) or (tile_id >= self.n_tiles):
            raise IndexError(
                f"Out of bounds, there is no tile {tile_id}. "
                f"There are {len(self) - 1} tiles, starting from index 0."
            )

        if self.channel_dimension is not None and not with_channel_dim:
            return self._tile_index[tile_id][
                ~(np.arange(self._n_dim) == self.channel_dimension)
            ]
        return self._tile_index[tile_id]

Returns tile position in the mosaic.

Args
  • tile_id (int): Specifies which tile's mosaic position will be returned. Must be smaller than the total number of tiles.
  • with_channel_dim (bool): Specifies whether to return position with channel dimension or without. Default is False.
Returns

np.ndarray: Tile mosaic position (tile position relative to other tiles).

#   def get_mosaic_shape(self, with_channel_dim: bool = False) -> numpy.ndarray:
View Source
    def get_mosaic_shape(self, with_channel_dim: bool = False) -> np.ndarray:
        """Returns mosaic shape.

        Args:
            with_channel_dim (bool):
                Specifies whether to return shape with channel dimension or without. Defaults to False.

        Returns:
            np.ndarray: Shape of tiles mosaic.
        """
        if self.channel_dimension is not None and not with_channel_dim:
            return self._indexing_shape[
                ~(np.arange(self._n_dim) == self.channel_dimension)
            ]
        return self._indexing_shape

Returns mosaic shape.

Args
  • with_channel_dim (bool): Specifies whether to return shape with channel dimension or without. Defaults to False.
Returns

np.ndarray: Shape of tiles mosaic.

#   def calculate_padding(self) -> Tuple[numpy.ndarray, List[Tuple[int, int]]]:
View Source
    def calculate_padding(self) -> Tuple[np.ndarray, List[Tuple[int, int]]]:
        """Calculate a frame padding for the current Tiler parameters.
        The padding is overlap//2 or tile_step//2, whichever is bigger.
        The method returns a tuple (new_shape, padding) where padding is
        ((before_1, after_1), … (before_N, after_N)), unique pad widths for each axis N.

        In the usual workflow, you'd recalculate tiling settings and then apply padding, prior to tiling.
        Then when merging, pass padding to `Merger.merge(extra_padding=padding, ...)`:
        ```python
        >>> tiler = Tiler(...)
        >>> merger = Merger(tiler, ...)
        >>> new_shape, padding = tiler.calculate_padding()
        >>> tiler.recalculate(data_shape=new_shape)
        >>> padded_data = np.pad(data, pad_width=padding, mode="reflect")
        >>> for tile_id, tile in tiler(padded_data):
        >>>     processed_tile = process(tile)
        >>>     merger.add(tile_id, processed_tile)
        >>> final_image = merger.merge(extra_padding=padding)
        ```
        Return:
            np.ndarray: Resulting shape when padding is applied.

            List[Tuple[int, int]]: Calculated padding.
        """

        # Choosing padding
        pre_pad = np.maximum(self._tile_step // 2, self._tile_overlap // 2)
        post_pad = pre_pad + np.mod(self._tile_step, 2)

        new_shape = pre_pad + self.data_shape + post_pad
        padding = list(zip(pre_pad, post_pad))

        return new_shape, padding

Calculate a frame padding for the current Tiler parameters. The padding is overlap//2 or tile_step//2, whichever is bigger. The method returns a tuple (new_shape, padding) where padding is ((before_1, after_1), … (before_N, after_N)), unique pad widths for each axis N.

In the usual workflow, you'd recalculate tiling settings and then apply padding, prior to tiling. Then when merging, pass padding to Merger.merge(extra_padding=padding, ...):

>>> tiler = Tiler(...)
>>> merger = Merger(tiler, ...)
>>> new_shape, padding = tiler.calculate_padding()
>>> tiler.recalculate(data_shape=new_shape)
>>> padded_data = np.pad(data, pad_width=padding, mode="reflect")
>>> for tile_id, tile in tiler(padded_data):
>>>     processed_tile = process(tile)
>>>     merger.add(tile_id, processed_tile)
>>> final_image = merger.merge(extra_padding=padding)
Return

np.ndarray: Resulting shape when padding is applied.

List[Tuple[int, int]]: Calculated padding.

#   class Merger:
View Source
class Merger:

    SUPPORTED_WINDOWS = [
        "boxcar",
        "triang",
        "blackman",
        "hamming",
        "hann",
        "bartlett",
        "parzen",
        "bohman",
        "blackmanharris",
        "nuttall",
        "barthann",
        "overlap-tile",
    ]
    r"""
    Supported windows:
    - 'boxcar' (default)  
        Boxcar window: the weight of each is tile element is 1.  
        Also known as rectangular window or Dirichlet window (and equivalent to no window at all).
    - 'triang'  
        Triangular window.
    - 'blackman'  
        Blackman window.
    - 'hamming'  
        Hamming window.
    - 'hann'  
        Hann window.
    - 'bartlett'  
        Bartlett window.
    - 'parzen'  
        Parzen window.
    - 'bohman'  
        Bohman window.
    - 'blackmanharris'  
        Minimum 4-term Blackman-Harris window.
    - 'nuttall'  
        Minimum 4-term Blackman-Harris window according to Nuttall.
    - 'barthann'  
        Bartlett-Hann window.    
    - 'overlap-tile'  
        Creates a boxcar window for the non-overlapping, middle part of tile, and zeros everywhere else.
        Requires applying padding calculated with `Tiler.calculate_padding()` for correct results.
        (based on Ronneberger et al. 2015, U-Net paper)
    """

    def __init__(
        self,
        tiler: Tiler,
        window: Union[None, str, np.ndarray] = None,
        logits: int = 0,
        save_visits: bool = True,
        data_dtype: npt.DTypeLike = np.float32,
        weights_dtype: npt.DTypeLike = np.float32,
    ):
        """Merger holds cumulative result buffers for merging tiles created by a given Tiler
        and the window function that is applied to added tiles.

        There are two required np.float64 buffers: `self.data` and `self.weights_sum`
        and one optional np.uint32 `self.data_visits` (see below `save_visits` argument).

        TODO:
            - generate window depending on tile border type
                # some reference for the future borders generation
                # 1d = 3 types of tiles: 2 corners and middle
                # 2d = 9 types of tiles: 4 corners, 4 tiles with 1 edge and middle
                # 3d = 25 types of tiles: 8 corners, 12 tiles with 2 edges, 6 tiles with one edge and middle
                # corners: 2^ndim
                # tiles: 2*ndim*nedges

        Args:
            tiler (Tiler): Tiler with which the tiles were originally created.

            window (None, str or np.ndarray): Specifies which window function to use for tile merging.
                Must be one of `Merger.SUPPORTED_WINDOWS` or a numpy array with the same size as the tile.
                Default is None which creates a boxcar window (constant 1s).

            logits (int): Specify whether to add logits dimensions in front of the data array. Default is `0`.

            save_visits (bool): Specify whether to save which elements has been modified and how many times in
                `self.data_visits`. Can be disabled to save some memory. Default is `True`.

            data_dtype (np.dtype): Specify data type for data buffer that stores cumulative result.
                Default is `np.float32`.

            weights_dtype (np.dtype): Specify data type for weights buffer that stores cumulative weights and window array.
                If you don't need precision but would rather save memory you can use `np.float16`.
                Likewise, on the opposite, you can use `np.float64`.
                Default is `np.float32`.

        """

        self.tiler = tiler

        # Logits support
        if not isinstance(logits, int) or logits < 0:
            raise ValueError(
                f"Logits must be an integer 0 or a positive number ({logits})."
            )
        self.logits = int(logits)

        # Generate data and normalization arrays
        self.data = self.data_visits = self.weights_sum = None
        self.data_dtype = data_dtype
        self.weights_dtype = weights_dtype
        self.reset(save_visits)

        # Generate window function
        self.window = None
        self.set_window(window)

    def _generate_window(self, window: str, shape: Union[Tuple, List]) -> np.ndarray:
        """Generate n-dimensional window according to the given shape.
        Adapted from: https://stackoverflow.com/a/53588640/1668421

        Args:
            window (str): Specifies window function. Must be one of `Merger.SUPPORTED_WINDOWS`.

            shape (tuple or list): Shape of the requested window.

        Returns:
            np.ndarray: n-dimensional window of the given shape and function
        """

        w = np.ones(shape, dtype=self.weights_dtype)
        overlap = self.tiler._tile_overlap
        for axis, length in enumerate(shape):
            if axis == self.tiler.channel_dimension:
                # channel dimension should have weight of 1 everywhere
                win = get_window("boxcar", length)
            else:
                if window == "overlap-tile":
                    axis_overlap = overlap[axis] // 2
                    win = np.zeros(length)
                    win[axis_overlap:-axis_overlap] = 1
                else:
                    win = get_window(window, length)

            for i in range(len(shape)):
                if i == axis:
                    continue
                else:
                    win = np.stack([win] * shape[i], axis=i)

            w *= win.astype(self.weights_dtype)

        return w

    def set_window(self, window: Union[None, str, np.ndarray] = None) -> None:
        """Sets window function depending on the given window function.

        Args:
            window (None, str or np.ndarray): Specifies which window function to use for tile merging.
                Must be one of `Merger.SUPPORTED_WINDOWS` or a numpy array with the same size as the tile.
                If passed None sets a boxcar window (constant 1s).

        Returns:
            None
        """

        # Warn user that changing window type after some elements were already visited is a bad idea.
        if np.count_nonzero(self.data_visits):
            warnings.warn(
                "You are setting window type after some elements were already added."
            )

        # Default window is boxcar
        if window is None:
            window = "boxcar"

        # Generate or set a window function
        if isinstance(window, str):
            if window not in self.SUPPORTED_WINDOWS:
                raise ValueError("Unsupported window, please check docs")
            self.window = self._generate_window(window, self.tiler.tile_shape)
        elif isinstance(window, np.ndarray):
            if not np.array_equal(window.shape, self.tiler.tile_shape):
                raise ValueError(
                    f"Window function must have the same shape as tile shape."
                )
            self.window = window.astype(self.weights_dtype)
        else:
            raise ValueError(
                f"Unsupported type for window function ({type(window)}), expected str or np.ndarray."
            )

    def reset(self, save_visits: bool = True) -> None:
        """Reset data, weights and optional data_visits buffers.

        Should be done after finishing merging full tile set and before starting processing the next tile set.

        Args:
            save_visits (bool): Specify whether to save which elements has been modified and how many times in
                `self.data_visits`. Can be disabled to save some memory. Default is `True`.

        Returns:
            None
        """

        padded_data_shape = self.tiler._new_shape

        # Image holds sum of all processed tiles multiplied by the window
        if self.logits:
            self.data = np.zeros(
                (self.logits, *padded_data_shape), dtype=self.data_dtype
            )
        else:
            self.data = np.zeros(padded_data_shape, dtype=self.data_dtype)

        # Data visits holds the number of times each element was assigned
        if save_visits:
            self.data_visits = np.zeros(
                padded_data_shape, dtype=np.uint32
            )  # uint32 ought to be enough for anyone :)

        # Total data window (weight) coefficients
        self.weights_sum = np.zeros(padded_data_shape, dtype=self.weights_dtype)

    def add(self, tile_id: int, data: np.ndarray) -> None:
        """Adds `tile_id`-th tile into Merger.

        Args:
            tile_id (int): Specifies which tile it is.

            data (np.ndarray): Specifies tile data.

        Returns:
            None
        """
        if tile_id < 0 or tile_id >= len(self.tiler):
            raise IndexError(
                f"Out of bounds, there is no tile {tile_id}. "
                f"There are {len(self.tiler)} tiles, starting from index 0."
            )

        data_shape = np.array(data.shape)
        expected_tile_shape = (
            ((self.logits,) + tuple(self.tiler.tile_shape))
            if self.logits > 0
            else tuple(self.tiler.tile_shape)
        )

        if self.tiler.mode != "irregular":
            if not np.all(np.equal(data_shape, expected_tile_shape)):
                raise ValueError(
                    f"Passed data shape ({data_shape}) "
                    f"does not fit expected tile shape ({expected_tile_shape})."
                )
        else:
            if not np.all(np.less_equal(data_shape, expected_tile_shape)):
                raise ValueError(
                    f"Passed data shape ({data_shape}) "
                    f"must be less or equal than tile shape ({expected_tile_shape})."
                )

        # Select coordinates for data
        shape_diff = expected_tile_shape - data_shape
        a, b = self.tiler.get_tile_bbox(tile_id, with_channel_dim=True)

        sl = [slice(x, y - shape_diff[i]) for i, (x, y) in enumerate(zip(a, b))]
        win_sl = [
            slice(None, -diff) if (diff > 0) else slice(None, None)
            for diff in shape_diff
        ]

        if self.logits > 0:
            self.data[tuple([slice(None, None, None)] + sl)] += (
                data * self.window[tuple(win_sl[1:])]
            )
            self.weights_sum[tuple(sl)] += self.window[tuple(win_sl[1:])]
        else:
            self.data[tuple(sl)] += data * self.window[tuple(win_sl)]
            self.weights_sum[tuple(sl)] += self.window[tuple(win_sl)]

        if self.data_visits is not None:
            self.data_visits[tuple(sl)] += 1

    def add_batch(self, batch_id: int, batch_size: int, data: np.ndarray) -> None:
        """Adds `batch_id`-th batch of `batch_size` tiles into Merger.

        Args:
            batch_id (int): Specifies batch number, must be >= 0.

            batch_size (int): Specifies batch size, must be >= 0.

            data (np.ndarray): Tile data array, must have shape `[batch, *tile_shape]

        Returns:
            None
        """

        # calculate total number of batches
        div, mod = np.divmod(len(self.tiler), batch_size)
        n_batches = (div + 1) if mod > 0 else div

        if batch_id < 0 or batch_id >= n_batches:
            raise IndexError(
                f"Out of bounds. There are {n_batches} batches of {batch_size}, starting from index 0."
            )

        # add each tile in a batch with computed tile_id
        for data_i, tile_i in enumerate(
            range(
                batch_id * batch_size, min((batch_id + 1) * batch_size, len(self.tiler))
            )
        ):
            self.add(tile_i, data[data_i])

    def _unpad(
        self, data: np.ndarray, extra_padding: Optional[List[Tuple[int, int]]] = None
    ):
        """Slices/unpads data according to merger and tiler settings, as well as additional padding.

        Args:
            data (np.ndarray): Data to be sliced.

            extra_padding (tuple of tuples of two ints, optional): Specifies padding that was applied to the data.
                Number of values padded to the edges of each axis.
                ((before_1, after_1), … (before_N, after_N)) unique pad widths for each axis.
                Default is None.
        """
        if extra_padding:
            sl = [
                slice(pad_from, shape - pad_to)
                for shape, (pad_from, pad_to) in zip(
                    self.tiler.data_shape, extra_padding
                )
            ]
        else:
            sl = [
                slice(None, self.tiler.data_shape[i])
                for i in range(len(self.tiler.data_shape))
            ]

        # if merger has logits dimension, add another slicing in front
        if self.logits:
            sl = [slice(None, None, None)] + sl

        return data[tuple(sl)]

    def merge(
        self,
        unpad: bool = True,
        extra_padding: Optional[List[Tuple[int, int]]] = None,
        argmax: bool = False,
        normalize_by_weights: bool = True,
        dtype: np.dtype = np.float64,
    ) -> np.ndarray:
        """Returns merged data array obtained from added tiles.

        Args:
            unpad (bool): If unpad is True, removes padded array elements. Default is True.

            extra_padding (tuple of tuples of two ints, optional): Specifies padding that was applied to the data.
                Number of values padded to the edges of each axis.
                ((before_1, after_1), … (before_N, after_N)) unique pad widths for each axis.
                Default is None.

            argmax (bool): If argmax is True, the first dimension will be argmaxed. Default is False.

            normalize_by_weights (bool): If normalize is True, the accumulated data will be divided by weights. Default is True.

            dtype (np.dtype): Specify dtype for the final . Default is `np.float64`.

        Returns:
            np.ndarray: Final merged data array obtained from added tiles.
        """

        data = self.data

        if normalize_by_weights:
            # ignoring division by zero
            # alternatively, set values < atol to 1
            # https://github.com/the-lay/tiler/blob/46e948bb2bd7a909e954baf87a0c15b384109fde/tiler/merger.py#L314
            # TODO check which way is better
            #  ignoring should be more precise without atol
            #  but can hide other errors
            with np.errstate(divide="ignore", invalid="ignore"):
                data = np.nan_to_num(data / self.weights_sum)

        if unpad:
            data = self._unpad(data, extra_padding)

        if argmax:
            data = np.argmax(data, 0)

        return data.astype(dtype)
#   Merger( tiler: tiler.tiler.Tiler, window: Union[NoneType, str, numpy.ndarray] = None, logits: int = 0, save_visits: bool = True, data_dtype: Union[numpy.dtype, NoneType, type, numpy._dtype_like._SupportsDType, str, Tuple[Any, int], Tuple[Any, Union[int, Sequence[int]]], List[Any], numpy._dtype_like._DTypeDict, Tuple[Any, Any]] = <class 'numpy.float32'>, weights_dtype: Union[numpy.dtype, NoneType, type, numpy._dtype_like._SupportsDType, str, Tuple[Any, int], Tuple[Any, Union[int, Sequence[int]]], List[Any], numpy._dtype_like._DTypeDict, Tuple[Any, Any]] = <class 'numpy.float32'> )
View Source
    def __init__(
        self,
        tiler: Tiler,
        window: Union[None, str, np.ndarray] = None,
        logits: int = 0,
        save_visits: bool = True,
        data_dtype: npt.DTypeLike = np.float32,
        weights_dtype: npt.DTypeLike = np.float32,
    ):
        """Merger holds cumulative result buffers for merging tiles created by a given Tiler
        and the window function that is applied to added tiles.

        There are two required np.float64 buffers: `self.data` and `self.weights_sum`
        and one optional np.uint32 `self.data_visits` (see below `save_visits` argument).

        TODO:
            - generate window depending on tile border type
                # some reference for the future borders generation
                # 1d = 3 types of tiles: 2 corners and middle
                # 2d = 9 types of tiles: 4 corners, 4 tiles with 1 edge and middle
                # 3d = 25 types of tiles: 8 corners, 12 tiles with 2 edges, 6 tiles with one edge and middle
                # corners: 2^ndim
                # tiles: 2*ndim*nedges

        Args:
            tiler (Tiler): Tiler with which the tiles were originally created.

            window (None, str or np.ndarray): Specifies which window function to use for tile merging.
                Must be one of `Merger.SUPPORTED_WINDOWS` or a numpy array with the same size as the tile.
                Default is None which creates a boxcar window (constant 1s).

            logits (int): Specify whether to add logits dimensions in front of the data array. Default is `0`.

            save_visits (bool): Specify whether to save which elements has been modified and how many times in
                `self.data_visits`. Can be disabled to save some memory. Default is `True`.

            data_dtype (np.dtype): Specify data type for data buffer that stores cumulative result.
                Default is `np.float32`.

            weights_dtype (np.dtype): Specify data type for weights buffer that stores cumulative weights and window array.
                If you don't need precision but would rather save memory you can use `np.float16`.
                Likewise, on the opposite, you can use `np.float64`.
                Default is `np.float32`.

        """

        self.tiler = tiler

        # Logits support
        if not isinstance(logits, int) or logits < 0:
            raise ValueError(
                f"Logits must be an integer 0 or a positive number ({logits})."
            )
        self.logits = int(logits)

        # Generate data and normalization arrays
        self.data = self.data_visits = self.weights_sum = None
        self.data_dtype = data_dtype
        self.weights_dtype = weights_dtype
        self.reset(save_visits)

        # Generate window function
        self.window = None
        self.set_window(window)

Merger holds cumulative result buffers for merging tiles created by a given Tiler and the window function that is applied to added tiles.

There are two required np.float64 buffers: self.data and self.weights_sum and one optional np.uint32 self.data_visits (see below save_visits argument).

TODO
  • generate window depending on tile border type # some reference for the future borders generation # 1d = 3 types of tiles: 2 corners and middle # 2d = 9 types of tiles: 4 corners, 4 tiles with 1 edge and middle # 3d = 25 types of tiles: 8 corners, 12 tiles with 2 edges, 6 tiles with one edge and middle # corners: 2^ndim # tiles: 2ndimnedges
Args
  • tiler (Tiler): Tiler with which the tiles were originally created.
  • window (None, str or np.ndarray): Specifies which window function to use for tile merging. Must be one of Merger.SUPPORTED_WINDOWS or a numpy array with the same size as the tile. Default is None which creates a boxcar window (constant 1s).
  • logits (int): Specify whether to add logits dimensions in front of the data array. Default is 0.
  • save_visits (bool): Specify whether to save which elements has been modified and how many times in self.data_visits. Can be disabled to save some memory. Default is True.
  • data_dtype (np.dtype): Specify data type for data buffer that stores cumulative result. Default is np.float32.
  • weights_dtype (np.dtype): Specify data type for weights buffer that stores cumulative weights and window array. If you don't need precision but would rather save memory you can use np.float16. Likewise, on the opposite, you can use np.float64. Default is np.float32.
#   SUPPORTED_WINDOWS = ['boxcar', 'triang', 'blackman', 'hamming', 'hann', 'bartlett', 'parzen', 'bohman', 'blackmanharris', 'nuttall', 'barthann', 'overlap-tile']

Supported windows:

  • 'boxcar' (default)
    Boxcar window: the weight of each is tile element is 1.
    Also known as rectangular window or Dirichlet window (and equivalent to no window at all).
  • 'triang'
    Triangular window.
  • 'blackman'
    Blackman window.
  • 'hamming'
    Hamming window.
  • 'hann'
    Hann window.
  • 'bartlett'
    Bartlett window.
  • 'parzen'
    Parzen window.
  • 'bohman'
    Bohman window.
  • 'blackmanharris'
    Minimum 4-term Blackman-Harris window.
  • 'nuttall'
    Minimum 4-term Blackman-Harris window according to Nuttall.
  • 'barthann'
    Bartlett-Hann window.
  • 'overlap-tile'
    Creates a boxcar window for the non-overlapping, middle part of tile, and zeros everywhere else. Requires applying padding calculated with Tiler.calculate_padding() for correct results. (based on Ronneberger et al. 2015, U-Net paper)
#   def set_window(self, window: Union[NoneType, str, numpy.ndarray] = None) -> None:
View Source
    def set_window(self, window: Union[None, str, np.ndarray] = None) -> None:
        """Sets window function depending on the given window function.

        Args:
            window (None, str or np.ndarray): Specifies which window function to use for tile merging.
                Must be one of `Merger.SUPPORTED_WINDOWS` or a numpy array with the same size as the tile.
                If passed None sets a boxcar window (constant 1s).

        Returns:
            None
        """

        # Warn user that changing window type after some elements were already visited is a bad idea.
        if np.count_nonzero(self.data_visits):
            warnings.warn(
                "You are setting window type after some elements were already added."
            )

        # Default window is boxcar
        if window is None:
            window = "boxcar"

        # Generate or set a window function
        if isinstance(window, str):
            if window not in self.SUPPORTED_WINDOWS:
                raise ValueError("Unsupported window, please check docs")
            self.window = self._generate_window(window, self.tiler.tile_shape)
        elif isinstance(window, np.ndarray):
            if not np.array_equal(window.shape, self.tiler.tile_shape):
                raise ValueError(
                    f"Window function must have the same shape as tile shape."
                )
            self.window = window.astype(self.weights_dtype)
        else:
            raise ValueError(
                f"Unsupported type for window function ({type(window)}), expected str or np.ndarray."
            )

Sets window function depending on the given window function.

Args
  • window (None, str or np.ndarray): Specifies which window function to use for tile merging. Must be one of Merger.SUPPORTED_WINDOWS or a numpy array with the same size as the tile. If passed None sets a boxcar window (constant 1s).
Returns

None

#   def reset(self, save_visits: bool = True) -> None:
View Source
    def reset(self, save_visits: bool = True) -> None:
        """Reset data, weights and optional data_visits buffers.

        Should be done after finishing merging full tile set and before starting processing the next tile set.

        Args:
            save_visits (bool): Specify whether to save which elements has been modified and how many times in
                `self.data_visits`. Can be disabled to save some memory. Default is `True`.

        Returns:
            None
        """

        padded_data_shape = self.tiler._new_shape

        # Image holds sum of all processed tiles multiplied by the window
        if self.logits:
            self.data = np.zeros(
                (self.logits, *padded_data_shape), dtype=self.data_dtype
            )
        else:
            self.data = np.zeros(padded_data_shape, dtype=self.data_dtype)

        # Data visits holds the number of times each element was assigned
        if save_visits:
            self.data_visits = np.zeros(
                padded_data_shape, dtype=np.uint32
            )  # uint32 ought to be enough for anyone :)

        # Total data window (weight) coefficients
        self.weights_sum = np.zeros(padded_data_shape, dtype=self.weights_dtype)

Reset data, weights and optional data_visits buffers.

Should be done after finishing merging full tile set and before starting processing the next tile set.

Args
  • save_visits (bool): Specify whether to save which elements has been modified and how many times in self.data_visits. Can be disabled to save some memory. Default is True.
Returns

None

#   def add(self, tile_id: int, data: numpy.ndarray) -> None:
View Source
    def add(self, tile_id: int, data: np.ndarray) -> None:
        """Adds `tile_id`-th tile into Merger.

        Args:
            tile_id (int): Specifies which tile it is.

            data (np.ndarray): Specifies tile data.

        Returns:
            None
        """
        if tile_id < 0 or tile_id >= len(self.tiler):
            raise IndexError(
                f"Out of bounds, there is no tile {tile_id}. "
                f"There are {len(self.tiler)} tiles, starting from index 0."
            )

        data_shape = np.array(data.shape)
        expected_tile_shape = (
            ((self.logits,) + tuple(self.tiler.tile_shape))
            if self.logits > 0
            else tuple(self.tiler.tile_shape)
        )

        if self.tiler.mode != "irregular":
            if not np.all(np.equal(data_shape, expected_tile_shape)):
                raise ValueError(
                    f"Passed data shape ({data_shape}) "
                    f"does not fit expected tile shape ({expected_tile_shape})."
                )
        else:
            if not np.all(np.less_equal(data_shape, expected_tile_shape)):
                raise ValueError(
                    f"Passed data shape ({data_shape}) "
                    f"must be less or equal than tile shape ({expected_tile_shape})."
                )

        # Select coordinates for data
        shape_diff = expected_tile_shape - data_shape
        a, b = self.tiler.get_tile_bbox(tile_id, with_channel_dim=True)

        sl = [slice(x, y - shape_diff[i]) for i, (x, y) in enumerate(zip(a, b))]
        win_sl = [
            slice(None, -diff) if (diff > 0) else slice(None, None)
            for diff in shape_diff
        ]

        if self.logits > 0:
            self.data[tuple([slice(None, None, None)] + sl)] += (
                data * self.window[tuple(win_sl[1:])]
            )
            self.weights_sum[tuple(sl)] += self.window[tuple(win_sl[1:])]
        else:
            self.data[tuple(sl)] += data * self.window[tuple(win_sl)]
            self.weights_sum[tuple(sl)] += self.window[tuple(win_sl)]

        if self.data_visits is not None:
            self.data_visits[tuple(sl)] += 1

Adds tile_id-th tile into Merger.

Args
  • tile_id (int): Specifies which tile it is.
  • data (np.ndarray): Specifies tile data.
Returns

None

#   def add_batch(self, batch_id: int, batch_size: int, data: numpy.ndarray) -> None:
View Source
    def add_batch(self, batch_id: int, batch_size: int, data: np.ndarray) -> None:
        """Adds `batch_id`-th batch of `batch_size` tiles into Merger.

        Args:
            batch_id (int): Specifies batch number, must be >= 0.

            batch_size (int): Specifies batch size, must be >= 0.

            data (np.ndarray): Tile data array, must have shape `[batch, *tile_shape]

        Returns:
            None
        """

        # calculate total number of batches
        div, mod = np.divmod(len(self.tiler), batch_size)
        n_batches = (div + 1) if mod > 0 else div

        if batch_id < 0 or batch_id >= n_batches:
            raise IndexError(
                f"Out of bounds. There are {n_batches} batches of {batch_size}, starting from index 0."
            )

        # add each tile in a batch with computed tile_id
        for data_i, tile_i in enumerate(
            range(
                batch_id * batch_size, min((batch_id + 1) * batch_size, len(self.tiler))
            )
        ):
            self.add(tile_i, data[data_i])

Adds batch_id-th batch of batch_size tiles into Merger.

Args
  • batch_id (int): Specifies batch number, must be >= 0.
  • batch_size (int): Specifies batch size, must be >= 0.
  • data (np.ndarray): Tile data array, must have shape `[batch, *tile_shape]
Returns

None

#   def merge( self, unpad: bool = True, extra_padding: Union[List[Tuple[int, int]], NoneType] = None, argmax: bool = False, normalize_by_weights: bool = True, dtype: numpy.dtype = <class 'numpy.float64'> ) -> numpy.ndarray:
View Source
    def merge(
        self,
        unpad: bool = True,
        extra_padding: Optional[List[Tuple[int, int]]] = None,
        argmax: bool = False,
        normalize_by_weights: bool = True,
        dtype: np.dtype = np.float64,
    ) -> np.ndarray:
        """Returns merged data array obtained from added tiles.

        Args:
            unpad (bool): If unpad is True, removes padded array elements. Default is True.

            extra_padding (tuple of tuples of two ints, optional): Specifies padding that was applied to the data.
                Number of values padded to the edges of each axis.
                ((before_1, after_1), … (before_N, after_N)) unique pad widths for each axis.
                Default is None.

            argmax (bool): If argmax is True, the first dimension will be argmaxed. Default is False.

            normalize_by_weights (bool): If normalize is True, the accumulated data will be divided by weights. Default is True.

            dtype (np.dtype): Specify dtype for the final . Default is `np.float64`.

        Returns:
            np.ndarray: Final merged data array obtained from added tiles.
        """

        data = self.data

        if normalize_by_weights:
            # ignoring division by zero
            # alternatively, set values < atol to 1
            # https://github.com/the-lay/tiler/blob/46e948bb2bd7a909e954baf87a0c15b384109fde/tiler/merger.py#L314
            # TODO check which way is better
            #  ignoring should be more precise without atol
            #  but can hide other errors
            with np.errstate(divide="ignore", invalid="ignore"):
                data = np.nan_to_num(data / self.weights_sum)

        if unpad:
            data = self._unpad(data, extra_padding)

        if argmax:
            data = np.argmax(data, 0)

        return data.astype(dtype)

Returns merged data array obtained from added tiles.

Args
  • unpad (bool): If unpad is True, removes padded array elements. Default is True.
  • extra_padding (tuple of tuples of two ints, optional): Specifies padding that was applied to the data. Number of values padded to the edges of each axis. ((before_1, after_1), … (before_N, after_N)) unique pad widths for each axis. Default is None.
  • argmax (bool): If argmax is True, the first dimension will be argmaxed. Default is False.
  • normalize_by_weights (bool): If normalize is True, the accumulated data will be divided by weights. Default is True.
  • dtype (np.dtype): Specify dtype for the final . Default is np.float64.
Returns

np.ndarray: Final merged data array obtained from added tiles.