Source code for fluidimage.topologies.preproc

"""Topology for image preprocessing (:mod:`fluidimage.topologies.preproc`)
==========================================================================

.. autoclass:: TopologyPreproc
   :members:
   :private-members:

"""

import copy
import os
import sys
from typing import Dict, Tuple

from fluiddyn.util.paramcontainer import ParamContainer
from fluidimage import SeriesOfArrays
from fluidimage.data_objects.preproc import ArraySerie as ArraySubset
from fluidimage.data_objects.preproc import PreprocResults, get_name_preproc
from fluidimage.topologies import TopologyBaseFromSeries, prepare_path_dir_result
from fluidimage.topologies.splitters import SplitterFromSeries
from fluidimage.util import imread
from fluidimage.works import image2image
from fluidimage.works.preproc import (
    WorkPreproc,
    _make_doc_with_filtered_params_doc,
)


[docs]class TopologyPreproc(TopologyBaseFromSeries): """Preprocess series of images. The most useful methods for the user (in particular :func:`compute`) are defined in the base class :class:`fluidimage.topologies.base.TopologyBase`. Parameters ---------- params: None A ParamContainer (created with the class method :func:`create_default_params`) containing the parameters for the computation. logging_level: str, {'warning', 'info', 'debug', ...} Logging level. nb_max_workers: None, int Maximum numbers of "workers". If None, a number is computed from the number of cores detected. If there are memory errors, you can try to decrease the number of workers. """ _short_name = "pre" Splitter = SplitterFromSeries
[docs] @classmethod def create_default_params(cls, backend="python"): """Class method returning the default parameters. Typical usage:: params = TopologyPreproc.create_default_params() # modify parameters here ... topo = TopologyPreproc(params) Parameters ---------- backend : {'python', 'opencv'} Specifies which backend to use. """ params = WorkPreproc.create_default_params(backend) params.series._set_attribs( { "str_subset": "all1by1", "ind_start": "first", "ind_stop": None, "ind_step": 1, } ) params.series._set_doc( """ Parameters describing image loading prior to preprocessing. - str_subset : str Determines the subset from the whole series of images that should be loaded and preprocessed together. Particularly useful when temporal filtering requires multiple images. For example, for a series of images with just one index, >>> str_subset = 'i:i+1' # load one image at a time >>> str_subset = 'i-2:i+3' # loads 5 images at a time Similarly for two indices, >>> str_subset = 'i:i+1,0' # load one image at a time, with second index fixed >>> str_subset = 'i-2:i+3,0' # loads 5 images at a time, with second index fixed - ind_start : int Start index for the whole series of images being loaded. For more details: see {class}`fluiddyn.util.serieofarrays.SeriesOfArrays`. - ind_stop : int Stop index for the whole series of images being loaded. For more details: see {class}`fluiddyn.util.serieofarrays.SeriesOfArrays`. - ind_step : int Step index for the whole series of images being loaded. For more details: see {class}`fluiddyn.util.serieofarrays.SeriesOfArrays`. """ ) super()._add_default_params_saving(params) params.saving._set_attribs( { "format": "img", "str_subset": None, }, ) params.saving._set_doc( """ Parameters describing image saving after preprocessing. - path : str or None Path to which preprocessed images are saved. - how : str {'ask', 'new_dir', 'complete', 'recompute'} How preprocessed images must be saved if it already exists or not. - postfix : str A suffix added to the new directory where preprocessed images are saved. - format : str {'img', 'hdf5'} Format in which preprocessed image data must be saved. - str_subset : str or None NotImplemented! Determines the sub-subset of images must be saved from subset of images that were loaded and preprocessed. When set as None, saves the middle image from every subset. """ ) params._set_child("im2im") image2image.complete_im2im_params_with_default(params.im2im) return params
def __init__( self, params: ParamContainer, logging_level="info", nb_max_workers=None ): self.params = params self.preproc_work = WorkPreproc(params) self.results = [] self.display = self.preproc_work.display self.series = SeriesOfArrays( params.series.path, params.series.str_subset, ind_start=params.series.ind_start, ind_stop=params.series.ind_stop, ind_step=params.series.ind_step, ) subset = self.series.get_serie_from_index(0) self.nb_items_per_serie = subset.get_nb_arrays() if os.path.isdir(params.series.path): path_dir = params.series.path else: path_dir = os.path.dirname(params.series.path) self.path_dir_input = path_dir path_dir_result, self.how_saving = prepare_path_dir_result( path_dir, params.saving.path, params.saving.postfix, params.saving.how, ) super().__init__( path_dir_result=path_dir_result, logging_level=logging_level, nb_max_workers=nb_max_workers, ) self.params.saving.path = self.path_dir_result # Define waiting queues queue_subsets_of_names = self.add_queue("subsets of filenames") queue_paths = self.add_queue("image paths") queue_arrays = queue_arrays1 = self.add_queue("arrays") queue_subsets_of_arrays = self.add_queue("subsets of arrays") queue_preproc_objects = self.add_queue("preproc results") if params.im2im.im2im is not None: queue_arrays1 = self.add_queue("arrays1") # Define works self.add_work( "fill (subsets_of_names, paths)", func_or_cls=self.fill_subsets_of_names_and_paths, output_queue=(queue_subsets_of_names, queue_paths), kind=("global", "one shot"), ) self.add_work( "imread", func_or_cls=imread, input_queue=queue_paths, output_queue=queue_arrays, kind="io", ) if params.im2im.im2im is not None: im2im_func = image2image.get_im2im_function_from_params(params.im2im) self.add_work( "image2image", func_or_cls=im2im_func, input_queue=queue_arrays, output_queue=queue_arrays1, kind="eat key value", ) self.add_work( "make subsets of arrays", func_or_cls=self.make_subsets, input_queue=(queue_subsets_of_names, queue_arrays1), output_queue=queue_subsets_of_arrays, kind="global", ) self.add_work( "preproc a subset of arrays", func_or_cls=self.preproc_work.calcul, params_cls=params, input_queue=queue_subsets_of_arrays, output_queue=queue_preproc_objects, ) self.add_work( "save images", func_or_cls=self.save_preproc_object, input_queue=queue_preproc_objects, kind="io", )
[docs] def save_preproc_object(self, obj: PreprocResults): """Save a preprocessing object""" ret = obj.save(path=self.path_dir_result) self.results.append(ret)
[docs] def compute_indices_to_be_computed(self): """Compute the indices corresponding to the series to be computed""" index_subsets = [] for ind_subset, subset in self.series.items(): names_serie = subset.get_name_arrays() name_preproc = get_name_preproc( subset, names_serie, ind_subset, self.series.nb_series, self.params.saving.format, ) if not (self.path_dir_result / name_preproc).exists(): index_subsets.append(ind_subset) return index_subsets
[docs] def fill_subsets_of_names_and_paths( self, input_queue: None, output_queues: Tuple[Dict] ) -> None: """Fill the two first queues""" assert input_queue is None queue_subsets_of_names, queue_paths = output_queues self.init_series() for ind_subset, subset in self.series.items(): queue_subsets_of_names[ind_subset] = subset.get_name_arrays() for name, path in subset.get_name_path_arrays(): queue_paths[name] = path
[docs] def make_subsets(self, input_queues: Tuple[Dict], output_queue: Dict) -> bool: """Create the subsets of images""" queue_subsets_of_names, queue_arrays = input_queues # for each name subset for key, names in list(queue_subsets_of_names.items()): # if correspondant arrays have been loaded from images, # make an array subset if all([name in queue_arrays for name in names]): arrays = (queue_arrays[name] for name in names) serie = copy.copy(self.series.get_serie_from_index(key)) array_subset = ArraySubset( names=names, arrays=arrays, serie=serie ) output_queue[key] = array_subset del queue_subsets_of_names[key] # remove the image_array if it not will be used anymore key_arrays = list(queue_arrays.keys()) for key_array in key_arrays: if not queue_subsets_of_names.is_name_in_values(key_array): del queue_arrays[key_array]
Topology = TopologyPreproc if "sphinx" in sys.modules: __doc__ += _make_doc_with_filtered_params_doc(Topology)