Source code for fluidimage.data_objects.piv

"""PIV objects (:mod:`fluidimage.data_objects.piv`)
===================================================

.. autoclass:: ArrayCouple
   :members:
   :private-members:

.. autoclass:: HeavyPIVResults
   :members:
   :private-members:

.. autoclass:: MultipassPIVResults
   :members:
   :private-members:

.. autoclass:: LightPIVResults
   :members:
   :private-members:

"""

import os

import h5netcdf
import h5py
import numpy as np

from fluidimage import ParamContainer
from fluidimage import __version__ as fluidimage_version
from fluidimage import imread
from fluidimage._version import hg_rev
from fluidimage.util import safe_eval

from .display_piv import DisplayPIV


def get_name_piv(serie, prefix="piv"):
    slicing_tuples = serie.get_slicing_tuples()
    str_indices = ""
    for idim, inds in enumerate(slicing_tuples):
        index = inds[0]
        str_index = serie.get_str_for_name_from_idim_idx(idim, index)
        if len(inds) > 1:
            last = serie.get_str_for_name_from_idim_idx(idim, inds[1] - 1)
            if last != str_index:
                if str_index.isalpha() and len(str_index) == 1:
                    sep = ""
                else:
                    sep = "-"
                str_index += sep + last

        if idim > 1:
            str_indices += serie.get_index_separators[idim - 1]
        str_indices += str_index

    name = prefix + "_" + str_indices + ".h5"
    return name


def get_name_bos(name, serie):
    name = name[len(serie.base_name) :]
    if serie.extension_file is not None:
        name = name[: -len(serie.extension_file) - 1]
    return "bos" + name + ".h5"


class DataObject:
    pass


def get_slices_from_strcrop(strcrop):
    return tuple(
        slice(*(int(i) if i else None for i in part.strip().split(":")))
        for part in strcrop.split(",")
    )


[docs]class ArrayCouple(DataObject): """Couple of arrays (images).""" def __init__( self, names=None, arrays=None, paths=None, serie=None, str_path=None, hdf5_parent=None, params_mask=None, ): self.params_mask = params_mask if str_path is not None: self._load(path=str_path) return if hdf5_parent is not None: self._load(hdf5_object=hdf5_parent["couple"]) return if serie is not None and paths is None: names = serie.get_name_arrays() if len(names) != 2: raise ValueError("serie has to contain 2 arrays.") paths = serie.get_path_arrays() if not serie.check_all_files_exist(): raise ValueError( "At least one file of this serie does not exists. \n" + str(paths) ) if paths is not None: paths = self.paths = tuple(os.path.abspath(p) for p in paths) if arrays is None: arrays = self.read_images() self.paths = paths self.names = tuple(names) self.name = self._compute_name_from_names() self.shape_images = arrays[0].shape self.arrays = self._mask_arrays(arrays) self.serie = serie def _compute_name_from_names(self): return "-".join(self.names) def _read_image(self, index): arr = imread(self.paths[index]) return arr def read_images(self): return tuple(self._read_image(i) for i in [0, 1]) def apply_mask(self, params_mask): if self.params_mask is not None and params_mask is None: raise NotImplementedError if self.params_mask is not None and params_mask == self.params_mask: return if self.params_mask is not None: raise NotImplementedError self.params_mask = params_mask self.arrays = self._mask_arrays(self.arrays) def _mask_array(self, array): if self.params_mask is None: return array if self.params_mask.strcrop is not None: indices = get_slices_from_strcrop(self.params_mask.strcrop) array = array[indices] return array def _mask_arrays(self, arrays): return tuple(self._mask_array(arr) for arr in arrays) def get_arrays(self): if not hasattr(self, "arrays"): self.arrays = self._mask_arrays(self.read_images()) return self.arrays def save(self, path=None, hdf5_parent=None): if path is not None: raise NotImplementedError if not isinstance(hdf5_parent, (h5py.File,)): raise NotImplementedError hdf5_parent.create_group("couple") group = hdf5_parent["couple"] assert isinstance(self.names, tuple) group.attrs["names"] = repr(self.names).encode() group.attrs["paths"] = repr(self.paths).encode() group.create_dataset("shape_images", data=self.shape_images) def _load(self, path=None, hdf5_object=None): if path is not None: raise NotImplementedError names = hdf5_object.attrs["names"] paths = hdf5_object.attrs["paths"] if isinstance(names, bytes): names = names.decode() paths = paths.decode() self.names = safe_eval(names) self.paths = safe_eval(paths) assert isinstance(self.names, tuple) try: self.shape_images = hdf5_object["shape_images"][...] except KeyError: pass
class ArrayCoupleBOS(ArrayCouple): """Couple of arrays (images).""" def _compute_name_from_names(self): return self.names[-1]
[docs]class HeavyPIVResults(DataObject): """Heavy PIV results containing displacements and correlation. Attributes ---------- xs, ys: 1d array of size `num_vectors` Positions of the vectors in pixels. Depend on the images. deltaxs, deltays: 1d array of size `num_vectors` Raw PIV results. correls: list of `num_vectors` 2d arrays Correlation matrices for each vector. correls_max: 1d array of size `num_vectors` Maximum correlation for each vector. deltaxs_approx, deltays_approx: 1d array of size `num_vectors_next_pass` Displacements interpolated on a grid that do not depend on the images. ixvecs_approx, iyvecs_approx: 1d array of size `num_vectors_next_pass` Positions in pixels of the vectors in ``deltaxs_approx``, ``deltays_approx``. deltaxs_final, deltays_final, ixvecs_final, iyvecs_final: Equivalent to the `_approx` variables but for the last pass and of size ``num_vectors``. """ _keys_to_be_saved = [ "xs", "ys", "deltaxs", "deltays", "correls_max", "deltaxs_approx", "deltays_approx", "ixvecs_approx", "iyvecs_approx", "deltaxs_final", "deltays_final", "ixvecs_final", "iyvecs_final", ] _dict_to_be_saved = ["errors", "deltaxs_wrong", "deltays_wrong"] def __init__( self, deltaxs=None, deltays=None, xs=None, ys=None, errors=None, correls_max=None, correls=None, couple=None, params=None, str_path=None, hdf5_object=None, secondary_peaks=None, indices_no_displacement=None, ): if hdf5_object is not None: if couple is not None: self.couple = couple if params is not None: self.params = params self._load_from_hdf5_object(hdf5_object) return if str_path is not None: self._load(str_path) return self.deltaxs = deltaxs self.deltays = deltays self.ys = ys self.xs = xs self.errors = errors self.correls_max = correls_max self.correls = correls self.couple = couple self.params = params self.secondary_peaks = secondary_peaks self.indices_no_displacement = indices_no_displacement def get_images(self): return self.couple.read_images() def display( self, show_interp=False, scale=0.2, show_error=True, pourcent_histo=99, hist=False, show_correl=True, xlim=None, ylim=None, ): try: im0, im1 = self.get_images() except IOError: im0, im1 = None, None return DisplayPIV( im0, im1, self, show_interp=show_interp, scale=scale, show_error=show_error, pourcent_histo=pourcent_histo, hist=hist, show_correl=show_correl, xlim=xlim, ylim=ylim, ) def _get_name(self, kind): serie = self.couple.serie if kind is None: if serie is None: return self.couple.name + ".h5" return get_name_piv(serie, prefix="piv") elif kind == "bos": return get_name_bos(self.couple.name, serie) def save(self, path=None, out_format=None, kind=None): name = self._get_name(kind) if path is not None: path_file = os.path.join(path, name) else: path_file = name if out_format == "uvmat": with h5netcdf.File(path_file, "w") as file: self._save_as_uvmat(file) else: with h5py.File(path_file, "w") as file: file.attrs["class_name"] = "HeavyPIVResults" file.attrs["module_name"] = "fluidimage.data_objects.piv" self._save_in_hdf5_object(file) return path_file def _save_in_hdf5_object(self, file, tag="piv0"): if "class_name" not in file.attrs.keys(): file.attrs["class_name"] = "HeavyPIVResults" file.attrs["module_name"] = "fluidimage.data_objects.piv" if "params" not in file.keys(): self.params._save_as_hdf5(hdf5_parent=file) if "couple" not in file.keys(): self.couple.save(hdf5_parent=file) g_piv = file.create_group(tag) g_piv.attrs["class_name"] = "HeavyPIVResults" g_piv.attrs["module_name"] = "fluidimage.data_objects.piv" for k in self._keys_to_be_saved: if k in self.__dict__ and self.__dict__[k] is not None: g_piv.create_dataset(k, data=self.__dict__[k]) for name_dict in self._dict_to_be_saved: try: d = self.__dict__[name_dict] if d is None: raise KeyError except KeyError: pass else: g = g_piv.create_group(name_dict) keys = list(d.keys()) values = list(d.values()) try: for i, k in enumerate(keys): keys[i] = k.encode() except AttributeError: pass try: for i, k in enumerate(values): values[i] = k.encode() except AttributeError: pass g.create_dataset("keys", data=keys) g.create_dataset("values", data=values) if "deltaxs_tps" in self.__dict__: g = g_piv.create_group("deltaxs_tps") for i, arr in enumerate(self.deltaxs_tps): g.create_dataset(f"subdom{i}", data=arr) g = g_piv.create_group("deltays_tps") for i, arr in enumerate(self.deltays_tps): g.create_dataset(f"subdom{i}", data=arr) def _load(self, path): self.file_name = os.path.basename(path) with h5py.File(path, "r") as file: self._load_from_hdf5_object(file["piv0"]) def _load_from_hdf5_object(self, g_piv): file = g_piv.parent if not hasattr(self, "params"): self.params = ParamContainer(hdf5_object=file["params"]) try: params_mask = self.params.mask except AttributeError: params_mask = None if not hasattr(self, "couple"): self.couple = ArrayCouple(hdf5_parent=file, params_mask=params_mask) for k in self._keys_to_be_saved: if k in g_piv: dataset = g_piv[k] self.__dict__[k] = dataset[:] for name_dict in self._dict_to_be_saved: try: g = g_piv[name_dict] keys = g["keys"] values = g["values"] dictionary = {} for key, value in zip(keys, values): try: key = k.decode() except AttributeError: pass try: value = value.decode() except AttributeError: pass dictionary[key] = value self.__dict__[name_dict] = dictionary except KeyError: pass if "deltaxs_tps" in g_piv.keys(): g = g_piv["deltaxs_tps"] self.deltaxs_tps = [] for arr in g.keys(): self.deltaxs_tps.append(g[arr][...]) g = g_piv["deltays_tps"] self.deltays_tps = [] for arr in g.keys(): self.deltays_tps.append(g[arr][...])
[docs] def get_grid_pixel(self, index_pass): """Recompute 1d arrays containing the approximate positions of the vectors Useful to compute a grid on which we can interpolate the displacement fields. Parameters ---------- index_pass: int Index of the pass Returns ------- xs1d: np.ndarray Indices (2nd, direction "x") of the pixel in the image ys1d: np.ndarray Indices (1st, direction "y") of the pixel in the image """ from ..postproc.piv import get_grid_pixel return get_grid_pixel(self.params, self.couple.shape_images, index_pass)
[docs]class MultipassPIVResults(DataObject): """Result of a multipass PIV computation.""" def __init__(self, str_path=None): self.passes = [] if str_path is not None: self._load(str_path) def display( self, i=-1, show_interp=False, scale=0.2, show_error=True, pourcent_histo=99, hist=False, show_correl=True, xlim=None, ylim=None, ): r = self.passes[i] return r.display( show_interp=show_interp, scale=scale, show_error=show_error, pourcent_histo=pourcent_histo, hist=hist, show_correl=show_correl, xlim=xlim, ylim=ylim, ) def __getitem__(self, key): return self.passes[key] def append(self, results): i = len(self.passes) self.passes.append(results) self.__dict__[f"piv{i}"] = results def _get_name(self, kind): if hasattr(self, "file_name"): return self.file_name r = self.passes[0] return r._get_name(kind) def save(self, path=None, out_format=None, kind=None): name = self._get_name(kind) if path is not None: root, ext = os.path.splitext(path) if ext in [".h5", ".nc"]: path_file = path else: path_file = os.path.join(path, name) else: path_file = name if out_format == "uvmat": with h5netcdf.File(path_file, "w") as file: self._save_as_uvmat(file) else: with h5py.File(path_file, "w") as file: file.attrs["class_name"] = "MultipassPIVResults" file.attrs["module_name"] = "fluidimage.data_objects.piv" file.attrs["Conventions"] = ( "fluidimage.data_objects.piv.MultipassPIVResults v1" ) file.attrs["nb_passes"] = len(self.passes) file.attrs["fluidimage_version"] = fluidimage_version file.attrs["fluidimage_hg_rev"] = hg_rev for i, r in enumerate(self.passes): r._save_in_hdf5_object(file, tag=f"piv{i}") return path_file def _load(self, path): self.file_name = os.path.basename(path) with h5py.File(path, "r") as file: self.params = ParamContainer(hdf5_object=file["params"]) try: params_mask = self.params.mask except AttributeError: params_mask = None self.couple = ArrayCouple(hdf5_parent=file, params_mask=params_mask) nb_passes = file.attrs["nb_passes"] for ip in range(nb_passes): g = file[f"piv{ip}"] self.append( HeavyPIVResults( hdf5_object=g, couple=self.couple, params=self.params ) ) def _save_as_uvmat(self, file): file.dimensions = {"nb_coord": 2, "nb_bounds": 2} for i, ir in enumerate(self.passes): iuvmat = i + 1 str_i = str(iuvmat) str_nb_vec = "nb_vec_" + str_i file.dimensions[str_nb_vec] = ir.xs.size tmp = np.zeros(ir.deltaxs.shape).astype("float32") inds = np.where(~np.isnan(ir.deltaxs)) file.create_variable(f"Civ{iuvmat}_X", (str_nb_vec,), data=ir.xs) file.create_variable(f"Civ{iuvmat}_Y", (str_nb_vec,), data=ir.ys) file.create_variable( f"Civ{iuvmat}_U", (str_nb_vec,), data=np.nan_to_num(ir.deltaxs) ) file.create_variable( f"Civ{iuvmat}_V", (str_nb_vec,), data=np.nan_to_num(ir.deltays) ) if ir.params.multipass.use_tps: str_nb_subdom = f"nb_subdomain_{iuvmat}" try: file.dimensions[str_nb_subdom] = np.shape(ir.deltaxs_tps)[0] file.dimensions[f"nb_tps{iuvmat}"] = np.shape(ir.deltaxs_tps)[ 1 ] tmp[inds] = ir.deltaxs_smooth file.create_variable( f"Civ{iuvmat}_U_smooth", (str_nb_vec,), data=tmp ) tmp[inds] = ir.deltays_smooth file.create_variable( f"Civ{iuvmat}_V_smooth", (str_nb_vec,), data=tmp ) file.create_variable( f"Civ{iuvmat}_U_tps", (str_nb_subdom, f"nb_vec_tps_{iuvmat}"), data=ir.deltaxs_tps, ) file.create_variable( f"Civ{iuvmat}_V_tps", (str_nb_subdom, f"nb_vec_tps_{iuvmat}"), data=ir.deltays_tps, ) tmp = [None] * file.dimensions[str_nb_subdom] for j in range(file.dimensions[str_nb_subdom]): tmp[j] = np.shape(ir.deltaxs_tps[j])[0] file.create_variable( f"Civ{iuvmat}_NbCentres", (f"nb_subdomain_{iuvmat}",), data=tmp, ) except: print(f"no tps field at passe n {iuvmat}") file.create_variable( f"Civ{iuvmat}_C", (str_nb_vec,), data=ir.correls_max ) tmp = np.zeros(ir.deltaxs.shape).astype("float32") indsnan = np.where(np.isnan(ir.deltaxs)) tmp[indsnan] = 1 file.create_variable(f"Civ{iuvmat}_FF", (str_nb_vec,), data=tmp) # mettre bonne valeur de F correspondant a self.piv0.error... file.create_variable(f"Civ{iuvmat}_F", (str_nb_vec,), data=tmp) # ADD # file.create_variable('Civ1_Coord_tps', # ('nb_subdomain_1', 'nb_coord', 'nb_tps_1'), # data=???) # ADD attributes def make_light_result(self, ind_pass=-1): piv = self.passes[ind_pass] if ind_pass == -1: deltaxs = piv.deltaxs_final deltays = piv.deltays_final ixvec = piv.ixvecs_final iyvec = piv.iyvecs_final else: deltaxs = piv.deltaxs_approx deltays = piv.deltays_approx ixvec = piv.ixvecs_approx iyvec = piv.ixvecs_approx if hasattr(self, "file_name"): file_name = self.file_name else: file_name = None return LightPIVResults( deltaxs, deltays, ixvec, iyvec, couple=piv.couple, params=piv.params, file_name=file_name, )
[docs] def get_grid_pixel(self, index_pass=-1): """Recompute 1d arrays containing the approximate positions of the vectors Useful to compute a grid on which we can interpolate the displacement fields. Parameters ---------- index_pass: int Index of the pass Returns ------- xs1d: np.ndarray Indices (2nd, direction "x") of the pixel in the image ys1d: np.ndarray Indices (1st, direction "y") of the pixel in the image """ piv = self.passes[index_pass] return piv.get_grid_pixel(index_pass)
[docs]class LightPIVResults(DataObject): _keys_to_be_saved = [ "ixvecs_final", "iyvecs_final", "deltaxs_final", "deltays_final", ] def __repr__(self): return f"{type(self).__name__}()" def __init__( self, deltaxs_approx=None, deltays_approx=None, ixvecs_grid=None, iyvecs_grid=None, correls_max=None, couple=None, params=None, str_path=None, hdf5_object=None, file_name=None, ): if file_name is not None: self.file_name = file_name if hdf5_object is not None: if couple is not None: self.couple = couple if params is not None: self.params = params self._load_from_hdf5_object(hdf5_object) return if str_path is not None: self._load(str_path) return self.deltaxs_final = deltaxs_approx self.deltays_final = deltays_approx self.couple = couple self.params = params self.ixvecs_final = ixvecs_grid self.iyvecs_final = iyvecs_grid def _get_name(self, kind): if hasattr(self, "file_name"): return self.file_name[:-3] + "_light.h5" serie = self.couple.serie str_ind0 = serie.compute_str_indices_from_indices( *[inds[0] for inds in serie.get_slicing_tuples()] ) str_ind1 = serie.compute_str_indices_from_indices( *[inds[1] - 1 for inds in serie.get_slicing_tuples()] ) name = "piv_" + serie.base_name + str_ind0 + "-" + str_ind1 + "_light.h5" return name def save(self, path=None, out_format=None, kind=None): name = self._get_name(kind) if path is not None: path_file = os.path.join(path, name) else: path_file = name with h5py.File(path_file, "w") as file: file.attrs["class_name"] = "LightPIVResults" file.attrs["module_name"] = "fluidimage.data_objects.piv" file.attrs["Conventions"] = ( "fluidimage.data_objects.piv.LightPIVResults v1" ) self._save_in_hdf5_object(file, tag="piv") return self def _save_in_hdf5_object(self, file, tag="piv"): if "class_name" not in file.attrs.keys(): file.attrs["class_name"] = "LightPIVResults" file.attrs["module_name"] = "fluidimage.data_objects.piv" if "params" not in file.keys(): self.params._save_as_hdf5(hdf5_parent=file) if "couple" not in file.keys(): self.couple.save(hdf5_parent=file) g_piv = file.create_group(tag) g_piv.attrs["class_name"] = "LightPIVResults" g_piv.attrs["module_name"] = "fluidimage.data_objects.piv" for k in self._keys_to_be_saved: if k in self.__dict__: g_piv.create_dataset(k, data=self.__dict__[k]) def _load(self, path): with h5py.File(path, "r") as file: self.params = ParamContainer(hdf5_object=file["params"]) try: params_mask = self.params.mask except AttributeError: params_mask = None self.couple = ArrayCouple(hdf5_parent=file, params_mask=params_mask) with h5py.File(path, "r") as file: keys = [(key) for key in file.keys() if "piv" in key] self._load_from_hdf5_object(file[max(keys)]) def _load_from_hdf5_object(self, g_piv): file = g_piv.parent if not hasattr(self, "params"): self.params = ParamContainer(hdf5_object=file["params"]) try: params_mask = self.params.mask except AttributeError: params_mask = None if not hasattr(self, "couple"): self.couple = ArrayCouple(hdf5_parent=file, params_mask=params_mask) for k in self._keys_to_be_saved: if k in g_piv: dataset = g_piv[k] self.__dict__[k] = dataset[:]