Source code for fluidimage.topologies.mean

"""Mean images topology

.. autoclass:: TopologyMeanImage
   :members:
   :private-members:

"""

import argparse
import os
from pathlib import Path

import h5py
import numpy as np
from PIL import Image

import fluidimage
from fluiddyn.util.serieofarrays import SerieOfArraysFromFiles
from fluidimage import ParamContainer
from fluidimage.topologies.base import TopologyBaseFromImages
from fluidimage.topologies.splitters import SplitterFromImages
from fluidimage.util import imread
from fluidimage.works import BaseWorkFromImage

# from transonic import boost, Array


# A2d = Array[np.uint32, "2d", "C"]


# x2 speedup of this operation but this is clearly not the bottleneck yet...
# @boost
# def sum_4_2darrays(arr0: A2d, arr1: A2d, arr2: A2d, arr3: A2d):
#     """Sum 4 2d arrays"""
#     n0, n1 = arr0.shape
#     for i0 in range(n0):
#         for i1 in range(n1):
#             arr0[i0, i1] = (
#                 arr0[i0, i1] + arr1[i0, i1] + arr2[i0, i1] + arr3[i0, i1]
#             )
#     return arr0


[docs] class TopologyMeanImage(TopologyBaseFromImages): """Compute in parallel the mean image.""" _short_name = "mean" Splitter = SplitterFromImages result: np.ndarray path_result: Path @classmethod def create_default_params(cls): params = ParamContainer(tag="params") super()._add_default_params_saving(params) BaseWorkFromImage._complete_params_with_default(params) return params def __init__(self, params, logging_level="info", nb_max_workers=None): p_images = params.images self.serie = SerieOfArraysFromFiles(p_images.path, p_images.str_subset) super().__init__( params=params, path_dir_src=self.serie.path_dir, logging_level=logging_level, nb_max_workers=nb_max_workers, ) queue_paths = self.add_queue("paths") queue_arrays = self.add_queue("arrays") queue_tmp_arrays = self.add_queue("tmp_arrays", "list") self.add_work( "fill_path", self.fill_queue_paths, output_queue=queue_paths, kind="one shot", ) self.add_work( "read_array", imread, input_queue=queue_paths, output_queue=queue_arrays, kind="io", ) self.add_work( "main", self.main, input_queue=(queue_paths, queue_arrays, queue_tmp_arrays), kind="global", ) self.results = [] image = self.serie.get_array_from_index(0) self.original_dtype = image.dtype def reduce_queue_tmp_arrays4(self, queue_tmp_arrays): while len(queue_tmp_arrays) >= 4: arr0, n0 = queue_tmp_arrays.pop() arr1, n1 = queue_tmp_arrays.pop() arr2, n2 = queue_tmp_arrays.pop() arr3, n3 = queue_tmp_arrays.pop() arr_sum = arr0 + arr1 + arr2 + arr3 # arr_sum = sum_4_2darrays(arr0, arr1, arr2, arr3) n_sum = n0 + n1 + n2 + n3 # print("reduce_queue_tmp_arrays4", n_sum) queue_tmp_arrays.insert(0, (arr_sum, n_sum)) self.results.extend([n_sum] * 3) def reduce_queue_tmp_arrays2(self, queue_tmp_arrays): while len(queue_tmp_arrays) >= 2: arr0, n0 = queue_tmp_arrays.pop() arr1, n1 = queue_tmp_arrays.pop() arr_sum = arr0 + arr1 n_sum = n0 + n1 # print("reduce_queue_tmp_arrays2", n_sum) queue_tmp_arrays.insert(0, (arr_sum, n_sum)) self.results.extend([n_sum]) def main(self, input_queues, output_queue): del output_queue queue_paths, queue_arrays, queue_tmp_arrays = input_queues assert isinstance(queue_tmp_arrays, list) while queue_arrays: name, arr = queue_arrays.pop_first_item() queue_tmp_arrays.append((arr.astype(np.uint32), 1)) self.reduce_queue_tmp_arrays4(queue_tmp_arrays) if ( not queue_paths and not queue_arrays and ( not hasattr(self.executor, "nb_working_workers_io") or self.executor.nb_working_workers_io == 0 ) ): self.reduce_queue_tmp_arrays2(queue_tmp_arrays) if not queue_tmp_arrays: return assert len(queue_tmp_arrays) == 1, queue_tmp_arrays arr_result, n_result = queue_tmp_arrays.pop() executor = self.executor try: index_process = executor.index_process except AttributeError: index_process = 0 path = executor.path_job_data / f"tmp_sum{index_process:03d}.h5" with h5py.File(path, "w") as file: file.create_dataset("arr", data=arr_result) file.attrs["num_images"] = n_result def final_seq_work(self): path_tmp_files = sorted(self.executor.path_job_data.glob("tmp_sum*.h5")) queue_tmp_arrays = [] for path_tmp_file in path_tmp_files: with h5py.File(path_tmp_file, "r") as file: arr = file["arr"][...] num_images = file.attrs["num_images"] queue_tmp_arrays.append((arr, num_images)) self.reduce_queue_tmp_arrays4(queue_tmp_arrays) self.reduce_queue_tmp_arrays2(queue_tmp_arrays) assert len(queue_tmp_arrays) == 1 arr, num_images = queue_tmp_arrays[0] self.result = (arr / num_images).astype(self.original_dtype) self.results.append(num_images) self.path_result = self.path_dir_result.with_name( self.path_dir_result.name + ".png" ) im = Image.fromarray(self.result) im.save(self.path_result, "PNG") im.close() for path_tmp_file in path_tmp_files: path_tmp_file.unlink()
def parse_args(): """Parse the arguments of the command line""" parser = argparse.ArgumentParser( description=TopologyMeanImage.__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "path", help="Path file or directory.", type=str, nargs="?", default=os.getcwd(), ) parser.add_argument("-v", "--verbose", help="verbose mode", action="count") parser.add_argument( "-V", "--version", help="Print fluidimage version and exit", action="count", ) parser.add_argument( "--executor", help="Name of the executor.", type=str, default=None, ) parser.add_argument( "-np", "--nb-max-workers", help="Maximum number of workers/processes.", type=int, default=None, ) parser.add_argument( "--subset", help="Subset of images.", type=str, default=None, ) parser.add_argument( "-o", "--output", help="Output path (without the .png extension).", type=str, default=None, ) return parser.parse_args() Topology = TopologyMeanImage def main(): """Main function for fluidimage-mean""" args = parse_args() if args.version: print(f"fluidimage {fluidimage.__version__}") return if args.executor is None: if args.nb_max_workers == 1: args.executor = "exec_sequential" else: args.executor = "exec_async_sequential" print(args) params = Topology.create_default_params() params.images.path = str(args.path) params.images.str_subset = args.subset params.saving.path = args.output topology = Topology(params) topology.compute(args.executor, nb_max_workers=args.nb_max_workers)