"""Correlation classes
======================
The correlation classes are able to compute correlations with
different methods.
.. autoclass:: CorrelBase
:members:
:private-members:
.. autoclass:: CorrelScipySignal
:members:
:private-members:
.. autoclass:: CorrelScipyNdimage
:members:
:private-members:
.. autoclass:: CorrelPythran
:members:
:private-members:
.. autoclass:: CorrelPyCuda
:members:
:private-members:
.. autoclass:: CorrelFFTNumpy
:members:
:private-members:
.. autoclass:: CorrelFFTW
:members:
:private-members:
.. autoclass:: CorrelCuFFT
:members:
:private-members:
.. autoclass:: CorrelSKCuFFT
:members:
:private-members:
"""
from abc import ABC, abstractmethod
from math import sqrt
import numpy as np
from scipy.ndimage import correlate
from scipy.signal import correlate2d
from transonic import Array, Type, boost
from .correl_pycuda import correl_pycuda
from .errors import PIVError
from .fft import (
CUFFT2DReal2Complex,
FFTW2DReal2Complex,
NumpyFFT2DReal2Complex,
SKCUFFT2DReal2Complex,
)
from .subpix import SubPix
def compute_indices_from_displacement(dx, dy, indices_no_displ):
return indices_no_displ[1] - dx, indices_no_displ[0] - dy
def parse_displacement_max(displ_max, im0_shape):
if isinstance(displ_max, str) and displ_max.endswith("%"):
return float(displ_max[:-1]) / 100 * max(im0_shape)
else:
return displ_max
A2dC = Array[Type(np.float32, np.float64), "2d", "C"]
A2df32 = "float32[][]"
def _is_there_a_nan(arr):
arr = arr.ravel()
for idx in range(9):
if np.isnan(arr[idx]):
return True
return False
@boost
def nan_indices_max(
correl: A2dC,
i0_start: np.int32,
i0_stop: np.int32,
i1_start: np.int32,
i1_stop: np.int32,
):
correl_max = np.nan
# first, get the first non nan value
n0, n1 = correl.shape
correl_flatten = correl.ravel()
for i_flat in range(i0_start * n1 + i1_start, n0 * n1):
value = correl_flatten[i_flat]
if not np.isnan(value):
correl_max = value
break
assert not np.isnan(correl_max)
i0_max = 0
i1_max = 0
for i0 in range(i0_start, i0_stop):
for i1 in range(i1_start, i1_stop):
value = correl[i0, i1]
if np.isnan(value):
continue
if value >= correl_max:
correl_max = value
i0_max = i0
i1_max = i1
error_message = ""
i0, i1 = i0_max, i1_max
if i0 == 0 or i0 == n0 - 1 or i1 == 0 or i1 == n1 - 1:
error_message = "Correlation peak touching boundary."
elif _is_there_a_nan(correl[i0 - 1 : i0 + 2, i1 - 1 : i1 + 2]):
error_message = "Correlation peak touching nan."
return i0_max, i1_max, error_message
def _compute_indices_max(
correl, norm, start_stop_for_search0, start_stop_for_search1
):
"""Compute the indices of the maximum correlation
Warning: important for perf, so use Pythran
"""
i0_start, i0_stop = start_stop_for_search0
i1_start, i1_stop = start_stop_for_search1
if i0_stop is None:
i0_stop, i1_stop = correl.shape
iy, ix, error_message = nan_indices_max(
correl, i0_start, i0_stop, i1_start, i1_stop
)
if norm == 0:
# I hope it is ok (Pierre)
correl_max = 0.0
else:
correl_max = correl[iy, ix] / norm
if error_message:
error = PIVError(explanation=error_message)
error.results = (ix, iy, correl_max)
raise error
return ix, iy, correl_max
[docs]class CorrelBase(ABC):
"""This class is meant to be subclassed, not instantiated directly."""
_tag = "base"
def __init__(
self,
im0_shape,
im1_shape,
method_subpix="centroid",
nsubpix=1,
displacement_max=None,
particle_radius=3,
nb_peaks_to_search=1,
mode=None,
):
self.mode = mode
self.subpix = SubPix(method=method_subpix, nsubpix=nsubpix)
self.im0_shape = im0_shape
self.im1_shape = im1_shape
self.iy0, self.ix0 = (i // 2 - 1 for i in im0_shape)
self.displacement_max = parse_displacement_max(
displacement_max, im0_shape
)
self.particle_radius = particle_radius
self.nb_peaks_to_search = nb_peaks_to_search
self.start_stop_for_search0 = [0, None]
self.start_stop_for_search1 = [0, None]
self._finalize_init()
@abstractmethod
def __call__(self, im0, im1):
"""Compute the correlation from 2 images."""
[docs] def _finalize_init(self):
"""Finalize initialization"""
[docs] def compute_displacement_from_indices(self, ix, iy):
"""Compute the displacement from a couple of indices."""
return self.ix0 - ix, self.iy0 - iy
[docs] def compute_indices_from_displacement(self, dx, dy):
"""Compute the indices corresponding to a displacement"""
return self.ix0 - dx, self.iy0 - dy
[docs] def get_indices_no_displacement(self):
"""Get the indices corresponding to no displacement"""
return self.iy0, self.ix0
def _compute_indices_max(self, correl, norm):
return _compute_indices_max(
correl, norm, self.start_stop_for_search0, self.start_stop_for_search1
)
[docs] def compute_displacements_from_correl(self, correl, norm=1.0):
"""Compute the displacement from a correlation."""
try:
ix, iy, correl_max = self._compute_indices_max(correl, norm)
except PIVError as piv_error:
ix, iy, correl_max = piv_error.results
# second chance to find a better peak...
correl[
iy - self.particle_radius : iy + self.particle_radius + 1,
ix - self.particle_radius : ix + self.particle_radius + 1,
] = np.nan
try:
ix2, iy2, correl_max2 = self._compute_indices_max(correl, norm)
except PIVError as _piv_error:
dx, dy = self.compute_displacement_from_indices(ix, iy)
_piv_error.results = (dx, dy, correl_max)
raise _piv_error
else:
ix, iy, correl_max = ix2, iy2, correl_max2
dx, dy = self.compute_displacement_from_indices(ix, iy)
if self.nb_peaks_to_search == 1:
other_peaks = None
elif self.nb_peaks_to_search >= 1:
correl = correl.copy()
other_peaks = []
for _ in range(0, self.nb_peaks_to_search - 1):
correl[
iy - self.particle_radius : iy + self.particle_radius + 1,
ix - self.particle_radius : ix + self.particle_radius + 1,
] = np.nan
try:
ix, iy, correl_max_other = self._compute_indices_max(
correl, norm
)
except PIVError:
break
dx_other, dy_other = self.compute_displacement_from_indices(
ix, iy
)
other_peaks.append((dx_other, dy_other, correl_max_other))
# print('found {} peaks'.format(len(other_peaks) + 1))
else:
raise ValueError
return dx, dy, correl_max, other_peaks
[docs] def apply_subpix(self, dx, dy, correl):
"""Compute the displacement with the subpix method."""
ix, iy = self.compute_indices_from_displacement(dx, dy)
ix, iy = self.subpix.compute_subpix(correl, ix, iy)
return self.compute_displacement_from_indices(ix, iy)
@boost
def correl_numpy(im0: A2df32, im1: A2df32, disp_max: int):
"""Correlations by hand using only numpy.
Parameters
----------
im0, im1 : images
input images : 2D matrix
disp_max : int
displacement max.
Notes
-----
im1_shape inf to im0_shape
Returns
-------
the computing correlation (size of computed correlation = disp_max*2 + 1)
"""
norm = np.sqrt(np.sum(im1**2) * np.sum(im0**2))
ny = nx = int(disp_max) * 2 + 1
ny0, nx0 = im0.shape
ny1, nx1 = im1.shape
zero = np.float32(0.0)
correl = np.empty((ny, nx), dtype=np.float32)
for xiy in range(disp_max + 1):
dispy = -disp_max + xiy
nymax = ny1 + min(ny0 // 2 - ny1 // 2 + dispy, 0)
ny1dep = -min(ny0 // 2 - ny1 // 2 + dispy, 0)
ny0dep = max(0, ny0 // 2 - ny1 // 2 + dispy)
for xix in range(disp_max + 1):
dispx = -disp_max + xix
nxmax = nx1 + min(nx0 // 2 - nx1 // 2 + dispx, 0)
nx1dep = -min(nx0 // 2 - nx1 // 2 + dispx, 0)
nx0dep = max(0, nx0 // 2 - nx1 // 2 + dispx)
tmp = zero
for iy in range(nymax):
for ix in range(nxmax):
tmp += (
im1[iy + ny1dep, ix + nx1dep]
* im0[ny0dep + iy, nx0dep + ix]
)
correl[xiy, xix] = tmp / (nxmax * nymax)
for xix in range(disp_max):
dispx = xix + 1
nxmax = nx1 - max(nx0 // 2 + nx1 // 2 + dispx - nx0, 0)
nx1dep = 0
nx0dep = nx0 // 2 - nx1 // 2 + dispx
tmp = zero
for iy in range(nymax):
for ix in range(nxmax):
tmp += (
im1[iy + ny1dep, ix + nx1dep]
* im0[ny0dep + iy, nx0dep + ix]
)
correl[xiy, xix + disp_max + 1] = tmp / (nxmax * nymax)
for xiy in range(disp_max):
dispy = xiy + 1
nymax = ny1 - max(ny0 // 2 + ny1 // 2 + dispy - ny0, 0)
ny1dep = 0
ny0dep = ny0 // 2 - ny1 // 2 + dispy
for xix in range(disp_max + 1):
dispx = -disp_max + xix
nxmax = nx1 + min(nx0 // 2 - nx1 // 2 + dispx, 0)
nx1dep = -min(nx0 // 2 - nx1 // 2 + dispx, 0)
nx0dep = max(0, nx0 // 2 - nx1 // 2 + dispx)
tmp = zero
for iy in range(nymax):
for ix in range(nxmax):
tmp += (
im1[iy + ny1dep, ix + nx1dep]
* im0[ny0dep + iy, nx0dep + ix]
)
correl[xiy + disp_max + 1, xix] = tmp / (nxmax * nymax)
for xix in range(disp_max):
dispx = xix + 1
nxmax = nx1 - max(nx0 // 2 + nx1 // 2 + dispx - nx0, 0)
nx1dep = 0
nx0dep = nx0 // 2 - nx1 // 2 + dispx
tmp = zero
for iy in range(nymax):
for ix in range(nxmax):
tmp += (
im1[iy + ny1dep, ix + nx1dep]
* im0[ny0dep + iy, nx0dep + ix]
)
correl[xiy + disp_max + 1, xix + disp_max + 1] = tmp / (nxmax * nymax)
correl = correl * im1.size
return correl, norm
[docs]class CorrelPythran(CorrelBase):
"""Correlation computed "by hands" with Numpy and Pythran"""
_tag = "pythran"
[docs] def _finalize_init(self):
if self.displacement_max is None:
if self.im0_shape == self.im1_shape:
displacement_max = min(self.im0_shape) // 2 - 1
else:
displacement_max = (
min(
self.im0_shape[0] - self.im1_shape[0],
self.im0_shape[1] - self.im1_shape[1],
)
// 2
- 1
)
if displacement_max <= 0:
raise ValueError(
"displacement_max <= 0 : problem with images shapes?"
)
self.displacement_max = displacement_max
self.ix0 = displacement_max
self.iy0 = displacement_max
def __call__(self, im0, im1):
"""Compute the correlation from images."""
return correl_numpy(im0, im1, self.displacement_max)
[docs]class CorrelPyCuda(CorrelBase):
"""Correlation using pycuda.
Correlation class by hands with cuda.
"""
_tag = "pycuda"
[docs] def _finalize_init(self):
if self.displacement_max is None:
if self.im0_shape == self.im1_shape:
displacement_max = min(self.im0_shape) // 2
else:
displacement_max = (
min(
self.im0_shape[0] - self.im1_shape[0],
self.im0_shape[1] - self.im1_shape[1],
)
// 2
- 1
)
if displacement_max <= 0:
raise ValueError(
"displacement_max <= 0 : problem with images shapes?"
)
self.displacement_max = displacement_max
self.ix0 = displacement_max
self.iy0 = displacement_max
def __call__(self, im0, im1):
"""Compute the correlation from images."""
correl, norm = correl_pycuda(im0, im1, self.displacement_max)
# ???
# self._add_info_to_correl(correl)
return correl, norm
[docs]class CorrelScipySignal(CorrelBase):
"""Correlations using scipy.signal.correlate2d"""
_tag = "scipy.signal"
[docs] def _finalize_init(self):
if self.mode is None:
self.mode = "same"
modes = ["valid", "same"]
if self.mode not in modes:
raise ValueError("mode should be in " + modes)
if self.mode == "same":
ny, nx = self.im0_shape
if nx % 2 == 0:
ind0x = nx // 2 - 1
else:
ind0x = nx // 2
if ny % 2 == 0:
ind0y = ny // 2 - 1
else:
ind0y = ny // 2
else:
ny, nx = np.array(self.im0_shape) - np.array(self.im1_shape)
ind0x = nx // 2
ind0y = ny // 2
self.iy0, self.ix0 = (ind0y, ind0x)
def __call__(self, im0, im1):
"""Compute the correlation from images."""
norm = _norm_images(im0, im1)
if self.mode == "valid":
correl = correlate2d(im0, im1, mode="valid")
elif self.mode == "same":
correl = correlate2d(im0, im1, mode="same", fillvalue=im1.min())
else:
assert False, "Bad value for self.mode"
return correl, norm
[docs]class CorrelScipyNdimage(CorrelBase):
"""Correlations using scipy.ndimage.correlate."""
_tag = "scipy.ndimage"
[docs] def _finalize_init(self):
self.iy0, self.ix0 = (i // 2 for i in self.im0_shape)
def __call__(self, im0, im1):
"""Compute the correlation from images."""
norm = np.sum(im1**2)
correl = correlate(im0, im1, mode="constant", cval=im1.min())
return correl, norm
class CorrelFFTBase(CorrelBase):
"""Correlations using fft."""
_tag = "fft.base"
def _finalize_init(self):
if self.displacement_max is not None:
where_large_displacement = np.zeros(self.im0_shape, dtype=bool)
for indices, v in np.ndenumerate(where_large_displacement):
dx, dy = self.compute_displacement_from_indices(*indices[::-1])
displacement = sqrt(dx**2 + dy**2)
if displacement > self.displacement_max:
where_large_displacement[indices] = True
self.where_large_displacement = where_large_displacement
n0, n1 = where_large_displacement.shape
for i0_start in range(n0):
if not all(where_large_displacement[i0_start, :]):
break
for i1_start in range(n1):
if not all(where_large_displacement[:, i1_start]):
break
for i0_stop in range(n0 - 1, -1, -1):
if not all(where_large_displacement[i0_stop, :]):
break
i0_stop += 1
for i1_stop in range(n1 - 1, -1, -1):
if not all(where_large_displacement[:, i1_stop]):
break
i1_stop += 1
self.start_stop_for_search0 = (i0_start, i0_stop)
self.start_stop_for_search1 = (i1_start, i1_stop)
self._check_im_shape(self.im0_shape, self.im1_shape)
def _check_im_shape(self, im0_shape, im1_shape):
if im1_shape is None:
im1_shape = im0_shape
if im0_shape != im1_shape:
raise ValueError(
"with this correlation method the input images "
"have to have the same shape."
)
def compute_displacements_from_correl(self, correl, norm=1.0):
"""Compute the displacement (with subpix) from a correlation."""
if self.displacement_max is not None:
correl = correl.copy()
correl[self.where_large_displacement] = np.nan
return super().compute_displacements_from_correl(correl, norm=norm)
@boost
def _norm_images(im0: A2df32, im1: A2df32):
"""Less accurate than the numpy equivalent but much faster
Should return something close to:
sqrt(np.sum(im1**2) * np.sum(im0**2))
"""
im0 = im0.ravel()
im1 = im1.ravel()
tmp0 = np.float64(im0[0] ** 2)
tmp1 = np.float64(im1[0] ** 2)
if im0.size != im1.size:
for idx in range(1, im0.size):
tmp0 += im0[idx] ** 2
for idx in range(1, im1.size):
tmp1 += im1[idx] ** 2
else:
for idx in range(1, im0.size):
tmp0 += im0[idx] ** 2
tmp1 += im1[idx] ** 2
return sqrt(tmp0 * tmp1)
@boost
def _like_fftshift(arr: A2dC):
"""Pythran optimized function doing the equivalent of
np.ascontiguousarray(np.fft.fftshift(arr[::-1, ::-1]))
"""
n0, n1 = arr.shape
arr = np.ascontiguousarray(arr[::-1, ::-1])
tmp = np.empty_like(arr)
if n1 % 2 == 0:
for i0 in range(n0):
for i1 in range(n1 // 2):
tmp[i0, n1 // 2 + i1] = arr[i0, i1]
tmp[i0, i1] = arr[i0, n1 // 2 + i1]
else:
for i0 in range(n0):
for i1 in range(n1 // 2 + 1):
tmp[i0, n1 // 2 + i1] = arr[i0, i1]
for i1 in range(n1 // 2):
tmp[i0, i1] = arr[i0, n1 // 2 + 1 + i1]
arr_1d_view = arr.ravel()
tmp_1d_view = tmp.ravel()
if n0 % 2 == 0:
n_half = (n0 // 2) * n1
for idx in range(n_half):
arr_1d_view[idx + n_half] = tmp_1d_view[idx]
arr_1d_view[idx] = tmp_1d_view[idx + n_half]
else:
n_half_a = (n0 // 2 + 1) * n1
n_half_b = (n0 // 2) * n1
for idx in range(n_half_a):
arr_1d_view[idx + n_half_b] = tmp_1d_view[idx]
for idx in range(n_half_b):
arr_1d_view[idx] = tmp_1d_view[idx + n_half_a]
return arr
class CorrelFFTWithOperBase(CorrelFFTBase):
FFTClass: object
def _finalize_init(self):
CorrelFFTBase._finalize_init(self)
n0, n1 = self.im0_shape
self.oper = self.FFTClass(n1, n0)
def __call__(self, im0, im1):
"""Compute the correlation from images.
Warning: important for perf, so use Pythran
"""
fft_im0 = self.oper.fft(im0)
fft_im0[0, 0] = 0.0
fft_im1 = self.oper.fft(im1)
fft_im1[0, 0] = 0.0
energy0 = self.oper.compute_energy_from_fourier(fft_im0)
energy1 = self.oper.compute_energy_from_fourier(fft_im1)
norm = sqrt(4 * energy0 * energy1) * self.oper.coef_norm_correl
correl = self.oper.ifft(fft_im0.conj() * fft_im1)
return _like_fftshift(correl), norm
[docs]class CorrelFFTNumpy(CorrelFFTWithOperBase):
"""Correlations using numpy.fft."""
FFTClass = NumpyFFT2DReal2Complex
_tag = "np.fft"
def __call__(self, im0, im1):
"""Compute the correlation from images.
Warning: important for perf, so use Pythran
"""
fft_im0 = self.oper.fft(im0)
fft_im0[0, 0] = 0.0
fft_im1 = self.oper.fft(im1)
fft_im1[0, 0] = 0.0
correl = self.oper.ifft(fft_im0.conj() * fft_im1).real
energy0 = self.oper.compute_energy_from_fourier(fft_im0)
energy1 = self.oper.compute_energy_from_fourier(fft_im1)
norm = sqrt(4 * energy0 * energy1) * self.oper.coef_norm_correl
return _like_fftshift(np.ascontiguousarray(correl)), norm
[docs]class CorrelFFTW(CorrelFFTWithOperBase):
"""Correlations using fluidimage.fft.FFTW2DReal2Complex"""
FFTClass = FFTW2DReal2Complex
_tag = "fftw"
[docs]class CorrelCuFFT(CorrelFFTWithOperBase):
"""Correlations using fluidimage.fft.CUFFT2DReal2Complex"""
_tag = "cufft"
FFTClass = CUFFT2DReal2Complex
[docs]class CorrelSKCuFFT(CorrelFFTWithOperBase):
"""Correlations using fluidimage.fft.FFTW2DReal2Complex"""
FFTClass = SKCUFFT2DReal2Complex
_tag = "skcufft"
correlation_classes = {
v._tag: v
for k, v in locals().items()
if k.startswith("Correl") and not k.endswith("Base")
}