Source code for fluidimage.works.piv.singlepass

"""Piv work and subworks
========================

.. autoclass:: BaseWorkPIV
   :members:
   :private-members:

.. autoclass:: FirstWorkPIV
   :members:
   :private-members:

.. autoclass:: WorkPIVFromDisplacement
   :members:
   :private-members:

"""

from copy import deepcopy

import numpy as np

from fluiddyn.util.serieofarrays import SerieOfArraysFromFiles

from ...calcul.correl import correlation_classes
from ...calcul.errors import PIVError
from ...calcul.interpolate.griddata import griddata
from ...calcul.interpolate.thin_plate_spline_subdom import ThinPlateSplineSubdom
from ...calcul.subpix import SubPix
from ...data_objects.piv import ArrayCouple, HeavyPIVResults
from ..with_mask import BaseWorkWithMask


class InterpError(ValueError):
    pass


def _isint(obj):
    return isinstance(obj, (int, np.integer))


[docs]class BaseWorkPIV(BaseWorkWithMask): """Base class for PIV. This class is meant to be subclassed, not instantiated directly. """
[docs] @classmethod def _complete_params_with_default(cls, params): pass
def _init_shape_crop(self, shape_crop_im0, shape_crop_im1): if shape_crop_im1 is None: shape_crop_im1 = shape_crop_im0 if _isint(shape_crop_im0): shape_crop_im0 = (shape_crop_im0, shape_crop_im0) elif ( isinstance(shape_crop_im0, (tuple, list)) and len(shape_crop_im0) == 2 ): shape_crop_im0 = tuple(shape_crop_im0) else: raise NotImplementedError( "For now, shape_crop_im0 has to be one or two integer!" ) if _isint(shape_crop_im1): shape_crop_im1 = (shape_crop_im1, shape_crop_im1) elif ( isinstance(shape_crop_im1, (tuple, list)) and len(shape_crop_im1) == 2 ): shape_crop_im1 = tuple(shape_crop_im1) else: raise NotImplementedError( "For now, shape_crop_im1 has to be one or two integer!" ) if ( shape_crop_im1[0] > shape_crop_im0[0] or shape_crop_im1[1] > shape_crop_im0[1] ): raise ValueError( "shape_crop_im1 must be inferior or equal to shape_crop_im0" ) self.shape_crop_im0 = tuple(int(n) for n in shape_crop_im0) self.shape_crop_im1 = tuple(int(n) for n in shape_crop_im1) def _init_correl(self): try: correl_cls = correlation_classes[self.params.piv0.method_correl] except KeyError: raise ValueError( "params.piv0.method_correl should be in " + str(list(correlation_classes.keys())) ) self.correl = correl_cls( im0_shape=self.shape_crop_im0, im1_shape=self.shape_crop_im1, method_subpix=self.params.piv0.method_subpix, nsubpix=self.params.piv0.nsubpix, displacement_max=self.params.piv0.displacement_max, particle_radius=self.params.piv0.particle_radius, nb_peaks_to_search=self.params.piv0.nb_peaks_to_search, )
[docs] def _prepare_with_image(self, im0=None, imshape=None): """Initialize the object with an image.""" if imshape is None: imshape = im0.shape self.imshape0 = (len_y, len_x) = imshape scim = self.shape_crop_im0 stepy = scim[0] - int(np.round(self.overlap * scim[0])) stepx = scim[1] - int(np.round(self.overlap * scim[1])) assert stepy >= 1 assert stepx >= 1 ixvec_max = len_x - self._stop_for_crop0[1] ixvecs = np.arange(self._start_for_crop0[1], ixvec_max, stepx, dtype=int) iyvec_max = len_y - self._stop_for_crop0[0] iyvecs = np.arange(self._start_for_crop0[0], iyvec_max, stepy, dtype=int) # There are some cases for which it is worth to add the last points... if ixvec_max - ixvecs[-1] > stepx // 2.5: # print('add another point (x)') ixvecs = np.append(ixvecs, ixvec_max) if iyvec_max - iyvecs[-1] > stepy // 2.5: # print('add another point (y)') iyvecs = np.append(iyvecs, iyvec_max) self.ixvecs = ixvecs self.iyvecs = iyvecs ixvecs, iyvecs = np.meshgrid(ixvecs, iyvecs) self.ixvecs_grid = ixvecs.flatten() self.iyvecs_grid = iyvecs.flatten()
[docs] def calcul(self, couple): """Calcul the PIV (one pass) from a couple of images.""" if isinstance(couple, SerieOfArraysFromFiles): couple = ArrayCouple(serie=couple) elif isinstance(couple, dict): couple = ArrayCouple(**couple) if not isinstance(couple, ArrayCouple): raise ValueError( f"not isinstance(couple, ArrayCouple), {type(couple) = }" ) couple.apply_mask(self.params.mask) im0, im1 = couple.get_arrays() if not hasattr(self, "ixvecs_grid"): self._prepare_with_image(im0) ( deltaxs, deltays, xs, ys, correls_max, correls, errors, secondary_peaks, ) = self._loop_vectors(im0, im1) xs, ys = self._xyoriginalimage_from_xymasked(xs, ys) result = HeavyPIVResults( deltaxs, deltays, xs, ys, errors, correls_max=correls_max, correls=correls, couple=deepcopy(couple), params=self.params, secondary_peaks=secondary_peaks, ) self._complete_result(result) return result
def _complete_result(self, result): result.indices_no_displacement = self.correl.get_indices_no_displacement() result.displacement_max = self.correl.displacement_max
[docs] def _pad_images(self, im0, im1): """Pad images with zeros. .. todo:: Choose correctly the variable npad. """ npad = self.npad = max(self._start_for_crop0 + self._stop_for_crop0) tmp = [(npad, npad), (npad, npad)] im0pad = np.pad(im0 - im0.min(), tmp, "constant", constant_values=0) im1pad = np.pad(im1 - im1.min(), tmp, "constant", constant_values=0) return im0pad, im1pad
[docs] def _calcul_positions_vectors_subimages( self, deltaxs_input=None, deltays_input=None ): """Calcul the indices corresponding to the vectors and cropped windows. Returns ------- xs : np.array x index of the position of the computed vector in the original images. ys : np.array y index of the position of the computed vector in the original images. ixs0_pad : np.array x index of the center of the crop image 0 in the padded image 0. iys0_pad : np.array y index of the center of the crop image 0 in the padded image 0. ixs1_pad : np.array x index of the center of the crop image 1 in the padded image 1. iys1_pad : np.array y index of the center of the crop image 1 in the padded image 1. """ # not used in this method del deltaxs_input, deltays_input ixs0_pad = self.ixvecs_grid + self.npad iys0_pad = self.iyvecs_grid + self.npad ixs1_pad = ixs0_pad iys1_pad = iys0_pad return ( self.ixvecs_grid, self.iyvecs_grid, ixs0_pad, iys0_pad, ixs1_pad, iys1_pad, )
[docs] def _loop_vectors(self, im0, im1, deltaxs_input=None, deltays_input=None): """Loop over the vectors to compute them.""" im0pad, im1pad = self._pad_images(im0, im1) xs, ys, ixs0_pad, iys0_pad, ixs1_pad, iys1_pad = ( self._calcul_positions_vectors_subimages(deltaxs_input, deltays_input) ) nb_vec = len(xs) correls = [None] * nb_vec errors = {} deltaxs = np.empty(xs.shape, dtype="float32") deltays = np.empty_like(deltaxs) correls_max = np.empty_like(deltaxs) secondary_peaks = [None] * nb_vec has_to_apply_subpix = self.index_pass == self.params.multipass.number - 1 for ivec in range(nb_vec): ixvec0 = ixs0_pad[ivec] iyvec0 = iys0_pad[ivec] ixvec1 = ixs1_pad[ivec] iyvec1 = iys1_pad[ivec] im0crop = self._crop_im0(ixvec0, iyvec0, im0pad) im1crop = self._crop_im1(ixvec1, iyvec1, im1pad) if ( im0crop.shape != self.shape_crop_im0 or im1crop.shape != self.shape_crop_im1 ): print( "Warning: Bad im_crop shape.", ixvec0, iyvec0, ixvec1, iyvec1, im0crop.shape, self.shape_crop_im0, im1crop.shape, self.shape_crop_im1, ) deltaxs[ivec] = np.nan deltays[ivec] = np.nan correls_max[ivec] = np.nan errors[ivec] = "Bad im_crop shape." continue # compute and store correlation map correl, norm = self.correl(im0crop, im1crop) correls[ivec] = correl # compute displacements corresponding to peaks try: ( deltax, deltay, correl_max, other_peaks, ) = self.correl.compute_displacements_from_correl( correl, norm=norm ) except PIVError as e: errors[ivec] = e.explanation deltaxs[ivec], deltays[ivec], correls_max[ivec] = e.results continue # increase precision on the displacement if has_to_apply_subpix: try: deltax, deltay = self.correl.apply_subpix( deltax, deltay, correl ) except PIVError as e: errors[ivec] = e.explanation deltaxs[ivec] = deltax deltays[ivec] = deltay correls_max[ivec] = correl_max secondary_peaks[ivec] = other_peaks if deltaxs_input is not None: deltaxs += deltaxs_input deltays += deltays_input return ( deltaxs, deltays, xs, ys, correls_max, correls, errors, secondary_peaks, )
[docs] def _init_crop(self): """Initialize the cropping of the images.""" scim0 = self.shape_crop_im0 scim1 = self.shape_crop_im1 self._start_for_crop0 = (scim0[0] // 2, scim0[1] // 2) _stop_for_crop0 = [scim0[0] // 2, scim0[1] // 2] if scim0[0] % 2 == 1: _stop_for_crop0[0] += 1 if scim0[1] % 2 == 1: _stop_for_crop0[1] += 1 self._stop_for_crop0 = tuple(_stop_for_crop0) self._start_for_crop1 = (scim1[0] // 2, scim1[1] // 2) _stop_for_crop1 = [scim1[0] // 2, scim1[1] // 2] if scim1[0] % 2 == 1: _stop_for_crop1[0] += 1 if scim1[1] % 2 == 1: _stop_for_crop1[1] += 1 self._stop_for_crop1 = tuple(_stop_for_crop1)
[docs] def _crop_im0(self, ixvec, iyvec, im): """Crop image 0.""" subim = im[ iyvec - self._start_for_crop0[0] : iyvec + self._stop_for_crop0[0], ixvec - self._start_for_crop0[1] : ixvec + self._stop_for_crop0[1], ] subim = np.ascontiguousarray(subim, dtype=np.float32) return subim
[docs] def _crop_im1(self, ixvec, iyvec, im): """Crop image 1.""" subim = im[ iyvec - self._start_for_crop1[0] : iyvec + self._stop_for_crop1[0], ixvec - self._start_for_crop1[1] : ixvec + self._stop_for_crop1[1], ] subim = np.ascontiguousarray(subim, dtype=np.float32) return subim
[docs] def apply_interp(self, piv_results, last=False): """Interpolate a PIV result object on the grid of the PIV work. Parameters ---------- piv_results : :class;`HeavyPIVResults` The interpolated field are added to this object. last : bool (False) Last pass or not. Notes ----- Depending on the value of `params.multipass.use_tps`, the interpolation is done with an iterative method based on the Thin Plate Spline method (:class:`fluidimage.calcul.interpolate.thin_plate_spline_subdom.ThinPlateSplineSubdom`) or done with a simple griddata method (much faster). """ if not last and not hasattr(piv_results, "ixvecs_approx"): ( piv_results.ixvecs_approx, piv_results.iyvecs_approx, ) = self._xyoriginalimage_from_xymasked( self.ixvecs_grid, self.iyvecs_grid ) if last and not hasattr(piv_results, "ixvecs_final"): ( piv_results.ixvecs_final, piv_results.iyvecs_final, ) = self._xyoriginalimage_from_xymasked( self.ixvecs_grid, self.iyvecs_grid ) # for the interpolation selection = ~( np.isnan(piv_results.deltaxs) | np.isnan(piv_results.deltays) ) if not any(selection): raise InterpError("Only nan.") xs = piv_results.xs[selection] ys = piv_results.ys[selection] xs_tmp, ys_tmp = self._xymasked_from_xyoriginalimage(xs, ys) centers = np.vstack([ys_tmp, xs_tmp]) deltaxs = piv_results.deltaxs[selection] deltays = piv_results.deltays[selection] if ( self.params.multipass.use_tps is True or self.params.multipass.use_tps == "last" and last ): # compute TPS coef smoothing_coef = self.params.multipass.smoothing_coef subdom_size = self.params.multipass.subdom_size threshold = self.params.multipass.threshold_tps print( f"TPS interpolation ({piv_results.couple.name}, " f"{smoothing_coef=}, {threshold=})." ) tps = ThinPlateSplineSubdom( centers, subdom_size, smoothing_coef, threshold=threshold, percent_buffer_area=20, ) try: ( deltaxs_smooth, deltaxs_tps, summary, ) = tps.compute_tps_weights_subdom(deltaxs) ( deltays_smooth, deltays_tps, summary, ) = tps.compute_tps_weights_subdom(deltays) except np.linalg.LinAlgError: print("LinAlgError in TPS: compute delta_approx with griddata") deltaxs_approx = griddata( centers[::-1], deltaxs, (self.ixvecs, self.iyvecs) ) deltays_approx = griddata( centers[::-1], deltays, (self.ixvecs, self.iyvecs) ) else: piv_results.deltaxs_tps = deltaxs_tps piv_results.deltays_tps = deltays_tps new_positions = np.vstack([self.iyvecs_grid, self.ixvecs_grid]) tps.init_with_new_positions(new_positions) deltaxs_approx = tps.interpolate(deltaxs_tps) deltays_approx = tps.interpolate(deltays_tps) if last: xs_smooth = piv_results.ixvecs_final.copy().astype(np.float32) xs_smooth[selection] = xs ys_smooth = piv_results.iyvecs_final.copy().astype(np.float32) ys_smooth[selection] = ys deltaxs_smooth_full = deltaxs_approx.copy() deltaxs_smooth_full[selection] = deltaxs_smooth deltays_smooth_full = deltays_approx.copy() deltays_smooth_full[selection] = deltays_smooth piv_results.xs_smooth = xs_smooth piv_results.ys_smooth = ys_smooth piv_results.deltaxs_smooth = deltaxs_smooth_full piv_results.deltays_smooth = deltays_smooth_full print("TPS summary: ", end="") nb_fixed_vectors_tot = summary.pop("nb_fixed_vectors_tot") if nb_fixed_vectors_tot > 0: print(f'{nb_fixed_vectors_tot} "erratic" vectors changed.') else: print("no erratic vector found.") print(f" Statistics on the {tps.nb_subdom} TPS subdomains:") for key, value in summary.items(): fmt = ".3f" if isinstance(value[0], float) else "d" print( f" {{:18s}}: min={{:{fmt}}}; max={{:{fmt}}}; ".format( key, min(value), max(value) ) + f"mean={sum(value)/len(value):.3f}" ) else: deltaxs_approx = griddata( centers[::-1], deltaxs, (self.ixvecs, self.iyvecs) ) deltays_approx = griddata( centers[::-1], deltays, (self.ixvecs, self.iyvecs) ) if last: piv_results.deltaxs_final = deltaxs_approx piv_results.deltays_final = deltays_approx else: piv_results.deltaxs_approx = deltaxs_approx piv_results.deltays_approx = deltays_approx
[docs]class FirstWorkPIV(BaseWorkPIV): """First PIV pass (without input displacements). Parameters ---------- params : ParamContainer ParamContainer object produced by the function :func:`fluidimage.works.piv.multipass.WorkPIV.create_default_params`. """ index_pass = 0
[docs] @classmethod def _complete_params_with_default(cls, params): params._set_child( "piv0", attribs={ "shape_crop_im0": 48, "shape_crop_im1": None, "displacement_max": None, "displacement_mean": None, # NotImplemented "method_correl": "fftw", "method_subpix": "2d_gaussian2", "nsubpix": None, "nb_peaks_to_search": 1, "particle_radius": 3, }, ) params.piv0._set_doc( """Parameters describing one PIV step. - `shape_crop_im0` : int (48) Shape of the cropped images 0 from which are computed the correlation. - `shape_crop_im1` : int or None Shape of the cropped images 0 (has to be None for correl based on fft). - displacement_max : None Displacement maximum used in correlation classes. The exact effect depends on the correlation method. For fft based correlation, it can also be of the form '50%' and then the maximum displacement is computed for each pass as a pourcentage of max(shape_crop_im0). - displacement_mean : None Displacement averaged over space (NotImplemented). - method_correl : str, default 'fftw' Can be in """ + str(list(correlation_classes.keys())) + """ - method_subpix : str, default '2d_gaussian2' Can be in """ + str(SubPix.methods) + """ - nsubpix : None Integer used in the subpix finder to compute the shape of the correlation crop (`(1+2*nsubpix,)*2`). It is related to the typical size of the particles. It has to be increased in case of peak locking (plot the histograms of the displacements). - nb_peaks_to_search : 1, int Number of peaks to search. Secondary peaks can be used during the fix step. - particle_radius : 3, int Typical radius of a particle (or more precisely of a correlation peak). Used only if `nb_peaks_to_search` is larger than one. """ ) params.piv0._set_child( "grid", attribs={"overlap": 0.5, "from": "overlap"} ) params.piv0.grid._set_doc( """ Parameters describing the grid. - overlap : float (0.5) Number smaller than 1 defining the overlap between interrogation windows. - from : str {'overlap'} Keyword for the method from which is computed the grid. """ ) cls._complete_params_with_default_mask(params)
def __init__(self, params): self.params = params self.overlap = params.piv0.grid.overlap if self.overlap >= 1: raise ValueError("params.piv0.grid.overlap has to be smaller than 1") self._init_shape_crop( params.piv0.shape_crop_im0, params.piv0.shape_crop_im1 ) self._init_crop() self._init_correl()
[docs]class WorkPIVFromDisplacement(BaseWorkPIV): """Work PIV working from already computed displacement (for multipass). Parameters ---------- params : ParamContainer ParamContainer object produced by the function :func:`fluidimage.works.piv.multipass.WorkPIV.create_default_params`. index_pass : int, 1 shape_crop_im0 : int or tuple, optional shape_crop_im1 : int or tuple, optional Notes ----- Steps for the PIV computation: - prepare the work PIV with an image if not already done (grid), - apply interpolation (TPS or griddata) to compute a estimation of the displacements, - loop over vectors to compute displacements. """ def __init__( self, params, index_pass=1, shape_crop_im0=None, shape_crop_im1=None ): self.params = params self.index_pass = index_pass self.overlap = params.piv0.grid.overlap if shape_crop_im0 is None: shape_crop_im0 = params.piv0.shape_crop_im0 if shape_crop_im1 is None: shape_crop_im1 = params.piv0.shape_crop_im1 self._init_shape_crop(shape_crop_im0, shape_crop_im1) self._init_crop() self._init_correl()
[docs] def calcul(self, piv_results): """Calcul the PIV (one pass) from a couple of images and displacement. .. todo:: Use the derivatives of the velocity to distort the image 1. """ if not isinstance(piv_results, HeavyPIVResults): raise ValueError couple = piv_results.couple im0, im1 = couple.get_arrays() if not hasattr(self, "ixvecs_grid"): self._prepare_with_image(im0) self.apply_interp(piv_results) deltaxs_input = np.round(piv_results.deltaxs_approx).astype("int32") deltays_input = np.round(piv_results.deltays_approx).astype("int32") ( deltaxs, deltays, xs, ys, correls_max, correls, errors, secondary_peaks, ) = self._loop_vectors(im0, im1, deltaxs_input, deltays_input) xs, ys = self._xyoriginalimage_from_xymasked(xs, ys) result = HeavyPIVResults( deltaxs, deltays, xs, ys, errors, correls_max=correls_max, correls=correls, couple=deepcopy(couple), params=self.params, secondary_peaks=secondary_peaks, ) self._complete_result(result) result.deltaxs_input = deltaxs_input result.deltays_input = deltays_input return result
[docs] def _calcul_positions_vectors_subimages( self, deltaxs_input=None, deltays_input=None ): """Calcul the indices corresponding to the vectors and cropped windows. Returns ------- xs : np.array x index of the position of the computed vector in the original images. ys : np.array y index of the position of the computed vector in the original images. ixs0_pad : np.array x index of the center of the crop image 0 in the padded image 0. iys0_pad : np.array y index of the center of the crop image 0 in the padded image 0. ixs1_pad : np.array x index of the center of the crop image 1 in the padded image 1. iys1_pad : np.array y index of the center of the crop image 1 in the padded image 1. """ ixs0 = self.ixvecs_grid - deltaxs_input // 2 iys0 = self.iyvecs_grid - deltays_input // 2 ixs1 = ixs0 + deltaxs_input iys1 = iys0 + deltays_input # if a point is outside an image => shift of subimages used # for correlation ind_outside = np.argwhere( (ixs0 > self.imshape0[1]) | (ixs0 < 0) | (ixs1 > self.imshape0[1]) | (ixs1 < 0) | (iys0 > self.imshape0[0]) | (iys0 < 0) | (iys1 > self.imshape0[0]) | (iys1 < 0) ) for ind in ind_outside: if ( (ixs1[ind] > self.imshape0[1]) or (iys1[ind] > self.imshape0[0]) or (ixs1[ind] < 0) or (iys1[ind] < 0) ): ixs0[ind] = self.ixvecs_grid[ind] - deltaxs_input[ind] iys0[ind] = self.iyvecs_grid[ind] - deltays_input[ind] ixs1[ind] = self.ixvecs_grid[ind] iys1[ind] = self.iyvecs_grid[ind] elif ( (ixs0[ind] > self.imshape0[1]) or (iys0[ind] > self.imshape0[0]) or (ixs0[ind] < 0) or (iys0[ind] < 0) ): ixs0[ind] = self.ixvecs_grid[ind] iys0[ind] = self.iyvecs_grid[ind] ixs1[ind] = self.ixvecs_grid[ind] + deltaxs_input[ind] iys1[ind] = self.iyvecs_grid[ind] + deltays_input[ind] xs = (ixs0 + ixs1) / 2.0 ys = (iys0 + iys1) / 2.0 ixs0_pad = ixs0 + self.npad iys0_pad = iys0 + self.npad ixs1_pad = ixs1 + self.npad iys1_pad = iys1 + self.npad return xs, ys, ixs0_pad, iys0_pad, ixs1_pad, iys1_pad