Source code for fluidimage.topologies.surface_tracking

"""Topology for surface tracking(:mod:`fluidimage.topologies.surface_tracking`)
===============================================================================

.. autoclass:: TopologySurfaceTracking
   :members:
   :private-members:

.. todo::

   Make this code usable:

   - good unittests
   - tutorial with ipynb
   - example integrated in the documentation

"""

import sys
from pathlib import Path

from fluiddyn.io.image import imsave_h5
from fluidimage import ParamContainer, SerieOfArraysFromFiles, SeriesOfArrays
from fluidimage.topologies import TopologyBase, prepare_path_dir_result
from fluidimage.util import imread, logger
from fluidimage.works import image2image
from fluidimage.works.surface_tracking import WorkSurfaceTracking


[docs]class TopologySurfaceTracking(TopologyBase): """Topology for SurfaceTracking. Parameters ---------- params : None A ParamContainer containing the parameters for the computation. logging_level : str, {'warning', 'info', 'debug', ...} Logging level. nb_max_workers : None, int Maximum numbers of "workers". If None, a number is computed from the number of cores detected. If there are memory errors, you can try to decrease the number of workers. """ _short_name = "surf"
[docs] @classmethod def create_default_params(cls): """Class method returning the default parameters.""" params = ParamContainer(tag="params") WorkSurfaceTracking._complete_params_with_default(params) params._set_child( "images", attribs={ "path": "", "path_ref": "", "str_subset_ref": None, "str_subset": None, }, ) params.images._set_doc( """ Parameters indicating the input image set. - path : str, {''} String indicating the input images (can be a full path towards an image file or a string given to `glob`). - path_ref : str, {''} String indicating the reference input images (can be a full path towards an image file or a string given to `glob`). - str_subset_ref : None String indicating as a Python slicing how to select reference images from the serie of reference images on the disk (in order to compute k_x value necessary for gain and filter). If None, no selection so all images are going to be processed. - str_subset : None String indicating as a Python slicing how to select images from the serie of images on the disk. If None, no selection so all images are going to be processed. """ ) super()._add_default_params_saving(params) params._set_child("preproc") image2image.complete_im2im_params_with_default(params.preproc) return params
def __init__(self, params, logging_level="info", nb_max_workers=None): self.params = params if params.surface_tracking is None: raise ValueError("params.surface_tracking has to be set.") self.serie = SerieOfArraysFromFiles( params.images.path, params.images.str_subset ) self.series = SeriesOfArrays( params.images.path, "i:i+" + str(self.serie.get_slicing_tuples()[0][2] + 1) + ":" + str(self.serie.get_slicing_tuples()[0][2]), ind_start=self.serie.get_slicing_tuples()[0][0], ind_stop=self.serie.get_slicing_tuples()[0][1] - 1, ind_step=self.serie.get_slicing_tuples()[0][2], ) path_dir = self.serie.path_dir path_dir_result, self.how_saving = prepare_path_dir_result( path_dir, params.saving.path, params.saving.postfix, params.saving.how ) self.path_dir_result = path_dir_result self.path_dir_src = Path(path_dir) self.surface_tracking_work = WorkSurfaceTracking(params) super().__init__( path_dir_result=path_dir_result, logging_level=logging_level, nb_max_workers=nb_max_workers, ) queue_paths = self.add_queue("paths") queue_couples_of_names = self.add_queue("couples of names") queue_arrays = queue_arrays1 = self.add_queue("arrays") queue_angles = self.add_queue("angles") queue_couples_of_arrays = self.add_queue( "couples of corrected angles and angles" ) queue_mod0_angles = self.add_queue("corrected angles copy") queue_mod_angles = self.add_queue("corrected angles") queue_heights_and_shapes = self.add_queue("heights and shapes") queue_heights = self.add_queue("heights") if params.preproc.im2im is not None: queue_arrays1 = self.add_queue("arrays1") self.add_work( "fill_path", self.fill_queue_paths, output_queue=(queue_paths, queue_couples_of_names), kind="one shot", ) self.add_work( "read_array", imread, input_queue=queue_paths, output_queue=queue_arrays, kind="io", ) if params.preproc.im2im is not None: im2im_func = image2image.get_im2im_function_from_params( params.preproc ) self.add_work( "image2image", func_or_cls=im2im_func, input_queue=queue_arrays, output_queue=queue_arrays1, kind="eat key value", ) self.add_work( "process_frame", self.surface_tracking_work.process_frame_func, input_queue=queue_arrays1, output_queue=queue_angles, ) self.add_work( "create_couple", self.make_couples, input_queue=(queue_mod0_angles, queue_angles, queue_couples_of_names), output_queue=(queue_mod_angles, queue_couples_of_arrays), kind="global", ) self.add_work( "correct_couple_of_phases", self.surface_tracking_work.correctcouple, input_queue=queue_couples_of_arrays, output_queue=queue_mod0_angles, ) self.add_work( "calcul_height", self.surface_tracking_work.calculheight_func, input_queue=queue_mod_angles, output_queue=queue_heights_and_shapes, ) self.add_work( "set_borders_zero", self.surface_tracking_work.set_borders_zero_func, input_queue=queue_heights_and_shapes, output_queue=queue_heights, ) self.add_work( "save", self.save_image, input_queue=queue_heights, kind=("io", "eat key value"), )
[docs] def make_couples(self, input_queues, output_queues): """correctphase""" queue_couples_of_names = input_queues[2] queue_angles = input_queues[1] queue_mod0_angles = input_queues[0] queue_couple = output_queues[1] if not (queue_couples_of_names): for key in tuple(queue_mod0_angles): output_queues[0][key] = queue_mod0_angles[key] del queue_mod0_angles[key] if not (queue_angles): print("no queue") return for key, couple in tuple(queue_couples_of_names.items()): # if correspondant arrays are available, make an array couple if couple[0] is couple[1]: if couple[0] in queue_angles.keys(): array1 = queue_angles[couple[0]] array2 = queue_angles[couple[0]] queue_couple[couple[0]] = (array1, array2) del queue_angles[couple[0]] del queue_couples_of_names[key] elif ( couple[0] in queue_mod0_angles.keys() and couple[1] in queue_angles.keys() ): array1 = queue_mod0_angles[couple[0]] array2 = queue_angles[couple[1]] queue_couple[couple[1]] = (array1, array2) del queue_angles[couple[1]] del queue_couples_of_names[key] output_queues[0][couple[0]] = queue_mod0_angles[couple[0]] del queue_mod0_angles[couple[0]]
def save_image(self, tuple_path_image): path, image = tuple_path_image name_file = Path(path).name path_out = self.path_dir_result / name_file imsave_h5(path_out, image, splitext=False) def fill_queue_paths(self, input_queue, output_queues): assert input_queue is None queue_paths = output_queues[0] queue_couples_of_names = output_queues[1] serie = self.serie if len(serie) == 0: logger.warning("add 0 image. No image to process.") return names = serie.get_name_arrays() for name in names: path_im_output = self.path_dir_result / name path_im_input = str(self.path_dir_src / name) if self.how_saving == "complete": if not path_im_output.exists(): queue_paths[name] = path_im_input else: queue_paths[name] = path_im_input if len(names) == 0: if self.how_saving == "complete": logger.warning( 'topology in mode "complete" and work already done.' ) else: logger.warning("Nothing to do") return nb_names = len(names) logger.info(f"Add {nb_names} images to compute.") logger.info("First files to process: " + str(names[:4])) logger.debug("All files: %s", names) series = self.series if not series: logger.warning("add 0 couple. No phase to correct.") return nb_series = len(series) logger.info(f"Add {nb_series} phase to correct.") for iserie, serie in enumerate(series): if iserie > 1: break logger.info( "Files of serie {}: {}".format(iserie, serie.get_name_arrays()) ) # for the first corrected angle : corrected_angle = angle ind_serie, serie = next(series.items()) name = serie.get_name_arrays()[0] queue_couples_of_names[ind_serie - 1] = (name, name) for ind_serie, serie in series.items(): queue_couples_of_names[ind_serie] = serie.get_name_arrays()
Topology = TopologySurfaceTracking if "sphinx" in sys.modules: params = TopologySurfaceTracking.create_default_params() __doc__ += params._get_formatted_docs()