Source code for fluidimage.works.piv.fix
"""Fix work
===========
.. autoclass:: WorkFIX
:members:
:private-members:
"""
from math import sqrt
import numpy as np
from fluiddyn.util.paramcontainer import ParamContainer
from fluidimage.calcul.errors import PIVError
from fluidimage.calcul.mean_neighbors import mean_neighbors_xy
from .. import BaseWork
[docs]class WorkFIX(BaseWork):
"""Fix a displacement vector field."""
[docs] @classmethod
def create_default_params(cls):
params = ParamContainer(tag="params")
cls._complete_params_with_default(params)
return params
[docs] @classmethod
def _complete_params_with_default(cls, params, tag="fix"):
params._set_child(
tag,
attribs={
"correl_min": 0.2,
"threshold_diff_neighbour": 10,
"displacement_max": None,
},
)
params.fix._set_doc(
"""
Parameters indicating how are detected and processed false vectors.
- correl_min : 0.2
Vectors associated with correlation smaller than correl_min are considered
as false vectors.
- threshold_diff_neighbour : 10
Vectors for which the difference with the average vectors is larger than
`threshold_diff_neighbour` are considered as false vectors.
- displacement_max : None
Vectors larger than `displacement_max` are considered as false vectors.
"""
)
def __init__(self, params_fix, piv_work):
self.params_fix = params_fix
self.piv_work = piv_work
def calcul(self, piv_results):
# print('piv_results = ', piv_results)
deltaxs_wrong = {}
deltays_wrong = {}
deltaxs = piv_results.deltaxs
deltays = piv_results.deltays
for ierr in piv_results.errors.keys():
deltaxs_wrong[ierr] = deltaxs[ierr]
deltays_wrong[ierr] = deltays[ierr]
deltaxs[ierr] = np.nan
deltays[ierr] = np.nan
def put_to_nan(inds, explanation):
for ind in inds:
ind = int(ind)
if ind not in deltaxs_wrong:
deltaxs_wrong[ind] = deltaxs[ind]
deltays_wrong[ind] = deltays[ind]
deltaxs[ind] = np.nan
deltays[ind] = np.nan
piv_results.errors[ind] = explanation
else:
piv_results.errors[ind] += " + " + explanation
# condition correl < correl_min
inds = (piv_results.correls_max < self.params_fix.correl_min).nonzero()[0]
put_to_nan(inds, "correl < correl_min")
# condition delta2 < displacement_max2
if self.params_fix.displacement_max:
displacement_max2 = self.params_fix.displacement_max**2
delta2s = deltaxs**2 + deltays**2
with np.errstate(invalid="ignore"):
inds = (delta2s > displacement_max2).nonzero()[0]
put_to_nan(inds, "delta2 < displacement_max2")
if self.params_fix.threshold_diff_neighbour is not None:
threshold = self.params_fix.threshold_diff_neighbour
ixvecs = self.piv_work.ixvecs
iyvecs = self.piv_work.iyvecs
ixvecs, iyvecs = self.piv_work._xyoriginalimage_from_xymasked(
ixvecs, iyvecs
)
dxs_neighbors, dys_neighbors = mean_neighbors_xy(
deltaxs, deltays, iyvecs, ixvecs
)
diff_x = dxs_neighbors - deltaxs
diff_y = dys_neighbors - deltays
differences2 = diff_x**2 + diff_y**2
with np.errstate(invalid="ignore"):
inds = (differences2 > threshold**2).nonzero()[0]
put_to_nan(inds, "diff neighbour too large")
for ivec in inds:
piv_results.errors[
ivec
] += f" (diff = {sqrt(differences2[ivec]):.2f})"
piv_results.deltaxs_wrong = deltaxs_wrong
piv_results.deltays_wrong = deltays_wrong
if self.piv_work.params.piv0.nb_peaks_to_search < 2:
return piv_results
# condition check 2nd peak
ratio_correl_peaks = 0.6
nb_bad_peaks_replaced = 0
secondary_peaks = piv_results.secondary_peaks
for ivec, other_peaks in enumerate(secondary_peaks):
if other_peaks is None or len(other_peaks) == 0:
continue
correl0 = piv_results.correls_max[ivec]
try:
dx_input = piv_results.deltaxs_input[ivec]
dy_input = piv_results.deltays_input[ivec]
except AttributeError:
# first pass
dx_input = 0
dy_input = 0
other_peaks_good = []
for dx, dy, corr in other_peaks:
if (
corr / correl0 > ratio_correl_peaks
and corr > self.params_fix.correl_min
):
dx += dx_input
dy += dy_input
other_peaks_good.append((dx, dy, corr))
if len(other_peaks_good) == 0:
continue
diff_neighbours = np.empty(len(other_peaks_good) + 1)
diff_neighbours[0] = sqrt(differences2[ivec])
for i, (dx, dy, corr) in enumerate(other_peaks_good):
diff_neighbours[i + 1] = np.sqrt(
(dxs_neighbors[ivec] - dx) ** 2
+ (dys_neighbors[ivec] - dy) ** 2
)
argmin = diff_neighbours.argmin()
if argmin != 0:
dx, dy, corr = other_peaks_good[argmin - 1]
if (
self.params_fix.threshold_diff_neighbour is not None
and diff_neighbours[argmin]
> self.params_fix.threshold_diff_neighbour
):
continue
if (
self.params_fix.displacement_max
and dx**2 + dy**2 > self.params_fix.displacement_max
):
continue
# try to apply subpix
correl = piv_results.correls[ivec]
dx -= dx_input
dy -= dy_input
try:
dx, dy = self.piv_work.correl.apply_subpix(dx, dy, correl)
except PIVError:
continue
dx += dx_input
dy += dy_input
nb_bad_peaks_replaced += 1
# saved the replaced vector
try:
replaced_vectors = piv_results.replaced_vectors
except AttributeError:
replaced_vectors = piv_results.replaced_vectors = {}
try:
old_dx = piv_results.deltaxs_wrong[ivec]
old_dy = piv_results.deltays_wrong[ivec]
except KeyError:
old_dx = piv_results.deltaxs[ivec]
old_dy = piv_results.deltays[ivec]
replaced_vectors[ivec] = (
old_dx,
old_dy,
piv_results.correls_max[ivec],
)
# replace!
piv_results.deltaxs[ivec] = dx
piv_results.deltays[ivec] = dy
piv_results.correls_max[ivec] = corr
if ivec in piv_results.errors:
del (
piv_results.deltaxs_wrong[ivec],
piv_results.deltays_wrong[ivec],
piv_results.errors[ivec],
)
if nb_bad_peaks_replaced > 0:
print(
f"Secondary peaks: {nb_bad_peaks_replaced} bad peak(s) replaced"
)
return piv_results