Source code for fluidimage.topologies.bos

"""Topology for BOS computation (:mod:`fluidimage.topologies.bos`)
==================================================================

.. autoclass:: TopologyBOS
   :members:
   :private-members:

"""

import sys
from pathlib import Path

from fluidimage import ParamContainer
from fluidimage.data_objects.piv import get_name_bos
from fluidimage.topologies import prepare_path_dir_result
from fluidimage.topologies.base import TopologyBaseFromImages
from fluidimage.topologies.splitters import SplitterFromImages
from fluidimage.util import imread
from fluidimage.works import image2image
from fluidimage.works.bos import WorkBOS


class SplitterBos(SplitterFromImages):
    def __init__(
        self, params, num_processes, topology=None, indices_to_be_computed=None
    ):
        super().__init__(
            params,
            num_processes,
            topology=topology,
            indices_to_be_computed=indices_to_be_computed,
        )
        self.params.reference = topology.path_reference


def _imread(path):
    image = imread(path)
    return image, Path(path).name


[docs]class TopologyBOS(TopologyBaseFromImages): """Topology for BOS computation. See https://en.wikipedia.org/wiki/Background-oriented_schlieren_technique The most useful methods for the user (in particular :func:`compute`) are defined in the base class :class:`fluidimage.topologies.base.TopologyBase`. Parameters ---------- params : None A ParamContainer (created with the class method :func:`create_default_params`) containing the parameters for the computation. logging_level : str, {'warning', 'info', 'debug', ...} Logging level. nb_max_workers : None, int Maximum numbers of "workers". If None, a number is computed from the number of cores detected. If there are memory errors, you can try to decrease the number of workers. """ _short_name = "bos" Splitter = SplitterBos
[docs] @classmethod def create_default_params(cls): """Class method returning the default parameters. Typical usage:: params = TopologyPIV.create_default_params() # modify parameters here ... topo = TopologyPIV(params) """ params = ParamContainer(tag="params") super()._add_default_params_saving(params) WorkBOS._complete_params_with_default(params) params._set_child("preproc") image2image.complete_im2im_params_with_default(params.preproc) return params
def __init__(self, params, logging_level="info", nb_max_workers=None): self.params = params self.main_work = WorkBOS(params) self.serie = self.main_work.serie self.path_reference = self.main_work.path_reference path_dir = Path(self.serie.path_dir) path_dir_result, self.how_saving = prepare_path_dir_result( path_dir, params.saving.path, params.saving.postfix, params.saving.how ) self.path_dir_result = path_dir_result self.path_dir_src = Path(path_dir) super().__init__( path_dir_result=path_dir_result, logging_level=logging_level, nb_max_workers=nb_max_workers, ) queue_paths = self.add_queue("paths") queue_arrays = queue_arrays1 = self.add_queue("arrays") queue_bos = self.add_queue("bos") if params.preproc.im2im is not None: queue_arrays1 = self.add_queue("arrays1") self.add_work( "fill paths", func_or_cls=self.fill_queue_paths, output_queue=queue_paths, kind=("global", "one shot"), ) self.add_work( "read array", func_or_cls=_imread, input_queue=queue_paths, output_queue=queue_arrays, kind="io", ) if params.preproc.im2im is not None: im2im_func = image2image.get_im2im_function_from_params( params.preproc ) self.add_work( "image2image", func_or_cls=im2im_func, input_queue=queue_arrays, output_queue=queue_arrays1, kind="eat key value", ) self.add_work( "compute bos", func_or_cls=self.calcul, params_cls=params, input_queue=queue_arrays, output_queue=queue_bos, ) self.add_work( "save bos", func_or_cls=self.save_bos_object, input_queue=queue_bos, kind="io", ) self.results = []
[docs] def save_bos_object(self, obj): """Save a BOS object""" ret = obj.save(self.path_dir_result, kind="bos") self.results.append(ret)
[docs] def calcul(self, tuple_image_path): """Compute a BOS field""" return self.main_work.calcul(tuple_image_path)
def _get_name_result_from_name(self, name): return get_name_bos(name, self.serie)
[docs] def _fix_indices_images(self, indices_images): """Fix the indices images in fill_queue_paths""" if not self.path_reference.is_relative_to(self.serie.path_dir): return if not self.path_reference.name.startswith(self.serie.base_name): return try: indices_ref = tuple( self.serie.compute_indices_from_name(self.path_reference.name) ) except ValueError: return try: indices_images.remove(indices_ref) except ValueError: pass
[docs] def make_text_at_exit(self, time_since_start): """Make a text printed at exit""" txt = f"Stop compute after t = {time_since_start:.2f} s" try: nb_results = len(self.results) except AttributeError: nb_results = None if nb_results is not None and nb_results > 0: txt += f" ({nb_results} bos fields, {time_since_start / nb_results:.2f} s/field)." else: txt += "." txt += "\npath results:\n" + str(Path(self.path_dir_result).absolute()) return txt
Topology = TopologyBOS if "sphinx" in sys.modules: _params = TopologyBOS.create_default_params() __doc__ += _params._get_formatted_docs()