"""Surface tracking
===================
.. autoclass:: WorkSurfaceTracking
:members:
:private-members:
"""
###############################################################################
# !/usr/bin/env python #
# -*- coding: utf-8 -*- #
# (C) Cyrille Bonamy, Stefan Hoerner, 2017 #
# LEGI Grenoble, University Otto-von-Guericke Magdeburg #
###############################################################################
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. #
# See the GNU General Public License for more details. #
# You should have received a copy of the GNU General Public License #
# along with this program. #
# If not, see <http://www.gnu.org/licenses/>. #
###############################################################################
# This function provides surface tracking tools, it is part of the #
# oscillating profile experiment at LEGI 2017 #
###############################################################################
import math
import sys
from pathlib import Path
import numpy as np
import scipy.interpolate
import scipy.io
from skimage.transform import resize
from fluiddyn.util.paramcontainer import ParamContainer
from fluidimage import SerieOfArraysFromFiles
from fluidimage.util import imread, logger
from . import BaseWork
[docs]class WorkSurfaceTracking(BaseWork):
"""Main work for surface tracking
Parameters
----------
params : :class:`fluiddyn.util.paramcontainer.ParamContainer`
The default parameters are obtained from the class method
:func:`WorkSurfaceTracking.create_default_params`.
"""
[docs] @classmethod
def create_default_params(cls):
"Create an object containing the default parameters (class method)."
params = ParamContainer(tag="params")
cls._complete_params_with_default(params)
return params
[docs] @classmethod
def _complete_params_with_default(cls, params):
params._set_child(
"surface_tracking",
attribs={
"xmin": 475,
"xmax": 640,
"ymin": 50,
"ymax": 700,
"distance_lens": 0.36,
"distance_object": 1.293,
"pix_size": 2.4 * 10**-4,
"startref_frame": 0,
"lastref_frame": 49,
"sur": 16,
"k_x": 40.625,
"k_y": 0,
"slicer": 4,
"red_factor": 1,
"n_frames_stock": 1,
"crop_edge": False,
"borders": 1,
"correct_pos": False,
"correct_height": False,
"offset": 0.0,
},
)
params.surface_tracking._set_doc(
"""Surface Tracking parameters:
- xmin: int (default 475)
xmin to crop the image im[xmin:xmax, ymin:ymax].
- xmax: int (default 640)
xmax to crop the image im[xmin:xmax, ymin:ymax]
- ymin: int (default 50)
ymin to crop the image im[xmin:xmax, ymin:ymax]
- ymax: int (default 700)
ymax to crop the image im[xmin:xmax, ymin:ymax]
- distance_lens: float (default 0.36)
distance in [m] lenses of camera/projetor
- distance_object: float (default 1.07)
distance in [m] camera/projector and surface
- pix_size: float (default 2.4 * 10 ** -4)
pixel size
- startref_frame: int (default 0)
indice of first reference image
- lastref_frame: int (default 49)
indice of last reference image
- sur: int (default 16)
- k_x: float (default 70.75)
wave vector oj. grid (approx. value, will set accurate later)
- k_y: float (default 0)
wave vector of the grid y-axis
- slicer: int (default 4)
cut the borders
- red_factor: int (default 1)
reduction factor to for the pixels to take tp speed up
- n_frames_stock: int (default 1)
number of frames to stock in one file
- crop_edge: boolean (default False)
searches for the structure and crops the part of the frame outside of
the structure
- borders: int (default 7)
pixel to set zero height additional to the borders if the structure was
cropped to avoid jerks
- correct_pos: boolean (default=False)
correct position of the height (necessary for large heights)
- correct_height: boolean (default=False)
correct height by a reference (provided from path_ref)
- offset: float (default 0.0)
height of the reference surface in [m] to zero level
"""
)
def __init__(self, params):
self.cpt = 0
self.params = params
self.works_surface_tracking = []
self.nameFrame = None
self.path = params.images.path
self.path_ref = params.images.path_ref
self.verify_process = False
self.ref_film = None
self.filmName = None
self.save_png = True
self.thres = 300
self.crop_edge = self.params.surface_tracking.crop_edge
self.borders = self.params.surface_tracking.borders
self.correct_pos = self.params.surface_tracking.correct_pos
self.correct_height = self.params.surface_tracking.correct_height
self.offset = self.params.surface_tracking.offset
self.xmin = self.params.surface_tracking.xmin
self.xmax = self.params.surface_tracking.xmax
self.ymin = self.params.surface_tracking.ymin
self.ymax = self.params.surface_tracking.ymax
self.distance_lens = self.params.surface_tracking.distance_lens
self.distance_object = self.params.surface_tracking.distance_object
self.pix_size = self.params.surface_tracking.pix_size
self.startref_frame = self.params.surface_tracking.startref_frame
self.lastref_frame = self.params.surface_tracking.lastref_frame
self.sur = self.params.surface_tracking.sur
self.k_y = self.params.surface_tracking.k_y
self.slicer = self.params.surface_tracking.slicer
self.red_factor = self.params.surface_tracking.red_factor
self.n_frames_stock = self.params.surface_tracking.n_frames_stock
self.plot_reduction_factor = 10
self.l_x = self.xmax - self.xmin
self.l_y = self.ymax - self.ymin
# wave_proj_pix = self.wave_proj / self.pix_size
self.kx = np.arange(-self.l_x / 2, self.l_x / 2) / self.l_x
self.ky = np.arange(-self.l_y / 2, self.l_y / 2) / self.l_y
self.refserie = SerieOfArraysFromFiles(
params.images.path_ref, params.images.str_subset_ref
)
k_x = self.compute_kx(self.refserie)
logger.info("Value of kx computed = " + str(k_x))
self.kslicer = 2 * k_x
self.wave_proj = 1 / (k_x / self.l_x / self.pix_size)
self.kxx = self.kx / self.pix_size
self.gain, self.filt = self.set_gain_filter(
k_x, self.l_y, self.l_x, self.slicer
)
self.a1_tmp = None
self.ref_height = self.process_ref()
logger.info("reference computed")
[docs] def compute_kx(self, serie):
"""calculates the average wave vector from a set of reference images
Parameters
----------
series: int
set of reference frames (arrays)
Returns
-------
wave_vector: float
average wave vector from the reference frame
"""
if len(serie) == 0:
logger.warning("0 ref image. Use of default k_x = 40.625.")
return 40.625
names = serie.get_path_arrays()
ref = np.zeros((self.ymax - self.ymin, self.xmax - self.xmin))
ii = 0
for name in names:
array = imread(str(Path(self.path_ref) / name))
frame = array[self.ymin : self.ymax, self.xmin : self.xmax].astype(
float
)
frame = self.frame_normalize(frame)
ref = ref + frame
ii += 1
self.ref = ref / ii
return self.wave_vector(
self.ref, self.ymin, self.ymax, self.xmin, self.xmax, self.sur
)
[docs] def set_gain_filter(self, k_x, l_y, l_x, slicer):
"""compute gain and filter"""
kx = np.arange(-l_x / 2, l_x / 2) / l_x
ky = np.arange(-l_y / 2, l_y / 2) / l_y
kxgrid, kygrid = np.meshgrid(kx, ky)
X, Y = np.meshgrid(kx * l_x, ky * l_y)
gain = np.exp(-1.0j * 2 * np.pi * (k_x / l_x * X))
filt1 = np.fft.fftshift(
np.exp(-((kxgrid**2 + kygrid**2) / 2 / (k_x / slicer / l_x) ** 2))
* np.exp(1 - 1 / (1 + ((kxgrid + k_x) ** 2 + kygrid**2) / k_x**2))
)
filt2 = np.fft.fftshift(
-np.exp(
-(
((kxgrid + (k_x / l_x)) ** 2 + kygrid**2)
/ 2
/ (k_x / 10 / l_x) ** 2
)
)
+ 1
)
filt3 = np.fft.fftshift(
-np.exp(
-(
((kxgrid - (k_x / l_x)) ** 2 + kygrid**2)
/ 2
/ (k_x / 10 / l_x) ** 2
)
)
+ 1
)
return gain, filt1 * filt2 * filt3
[docs] def get_borders(self, frame):
"""find the left and right border of the surface in a given frame
Parameters
----------
frame: int
frame of the high speed video (np.array)
Returns
-------
xmin: int
left border of the structure
xmax: int
right border of the structure
"""
frame_thres = 1.0 * (frame > self.thres)
a = np.argmax(frame_thres, axis=1)
xmin = np.median(a)
b = np.argmax(frame_thres[:, ::-1], axis=1)
b_med = np.median(b)
xmax = frame.shape[1] - b_med
return int(xmin), int(xmax)
[docs] def merge_cropped_frame(self, frame, x_min, x_max):
"""puts the actual frame in the reference plate frame to avoid jerks
and to keep the dimensions
Parameters
----------
frame: int
frame of the high speed video (np.array)
Returns
-------
calc_frame: int
frame of the reference size with embedded smaller structure
"""
if x_max >= self.xmax:
x_max = self.xmax - 1
print("INFO:x_max adjusted")
if x_min <= self.xmin:
x_min = self.xmin
print("INFO:x_min adjusted")
calc_frame = self.ref
calc_frame[:, x_min - self.xmin : -(self.xmax - x_max)] = frame[
self.ymin : self.ymax, x_min:x_max
]
return calc_frame
[docs] def rectify_frame(self, frame, gain, filt):
"""rectify a frame with gain and filt
Parameters
----------
frame: int
frame of the high speed video (np.array)
gain: complex array
gain for the pattern of the frame
filt: complex array
filter for the pattern of the frame gain:
Returns
-------
rectified frame: int
array of the rectified frame
"""
return np.fft.fft2(frame * gain) * filt
[docs] def frame_normalize(self, frame):
"""normalize the frame values by its mean value
Parameters
----------
frame: int
frame of the high speed video (np.array)
Returns
-------
normalized_frame: int
normalized frame of the high speed video (np.array)
"""
meanx_frame = np.mean(frame, axis=1)
for y in range(np.shape(frame)[1]):
frame[:, y] = frame[:, y] / meanx_frame
normalized_frame = frame - np.mean(frame)
return normalized_frame
[docs] def process_frame(
self, frame, ymin, ymax, xmin, xmax, gain, filt, red_factor
):
"""process a frame and return phase
Parameters
----------
frame: int array
single frame of the high speed video
xmin: int (default 475)
xmin to crop the image im[xmin:xmax, ymin:ymax]
xmax: int (default 640)
xmax to crop the image im[xmin:xmax, ymin:ymax]
ymin: int (default 50)
ymin to crop the image im[xmin:xmax, ymin:ymax]
ymax: int (default 700)
ymax to crop the image im[xmin:xmax, ymin:ymax]
gain: complex array
gain for the pattern of the frame
filt: complex array
filter for the pattern of the frame
red_factor: int(default 1)
reduction factor for the frame array to speed up the calc
Returns
-------
a: array containing phase [radians]
"""
if self.crop_edge is False:
frame = frame[ymin:ymax, xmin:xmax]
frame1 = self.frame_normalize(frame).astype(float)
frame_filtered = self.rectify_frame(frame1, gain, filt)
inversed_filt = np.fft.ifft2(frame_filtered)
inversed_filt = inversed_filt[::red_factor, ::red_factor]
a = np.unwrap(np.angle(inversed_filt), axis=1) # by lines
a = np.unwrap(a, axis=0) # by columns
return a
[docs] def process_frame_func(self, array):
"""call process_frame function with surface_tracking parameters
Parameters
----------
array_and_path : tuple containing array and path
Returns
-------
array_and_path : tuple containing array/phase [radians], frame shape
and path
"""
x_min = self.xmin
x_max = self.xmax
if self.crop_edge:
x_min, x_max = self.get_borders(array)
array = self.merge_cropped_frame(array, x_min, x_max)
shape = (x_min, x_max)
return (
self.process_frame(
array,
self.ymin,
self.ymax,
self.xmin,
self.xmax,
self.gain,
self.filt,
self.red_factor,
),
shape,
)
[docs] def calculheight_func(self, array_and_shape):
"""call convphase function with surface_tracking parameters
Parameters
----------
array_and_path : tuple containing array/phase [radians],
shape of the frame and path
Returns
-------
height_and_path : tuple containing array/height [m], frame shape
and path
"""
array, shape = array_and_shape
array_ = []
for a in array:
jumps = [
np.sign(int(a[i + 1] - angle))
for i, angle in enumerate(a[:-1])
if a[i + 1] - angle > 0.95 * np.pi
or a[i + 1] - angle < 0.95 * np.pi
]
mapper = np.zeros(len(jumps))
smoother = []
switch = 0
for i, val in enumerate(mapper):
if jumps[i] > 0:
switch = switch + 1
if jumps[i] < 0:
switch = switch - 1
if switch >= 1:
val = -2 * np.pi * switch
if switch <= -1:
val = 2 * np.pi * switch
smoother.append(val)
smoother.insert(0, 0)
array_smoothed = a + smoother
array_.append(np.array(array_smoothed))
array_s = np.array(array_)
array_s = array_s.T
for a in array_s:
jumps = [
np.sign(int(a[i + 1] - angle))
for i, angle in enumerate(a[:-1])
if a[i + 1] - angle > 0.95 * np.pi
or a[i + 1] - angle < 0.95 * np.pi
]
mapper = np.zeros(len(jumps))
smoother = []
switch = 0
for i, val in enumerate(mapper):
if jumps[i] > 0:
switch = switch + 1
if jumps[i] < 0:
switch = switch - 1
if switch >= 1:
val = -2 * np.pi
if switch <= -1:
val = 2 * np.pi
smoother.append(val)
smoother.insert(0, 0)
array_smoothed = a + smoother
array_.append(np.array(array_smoothed))
array_s = array
return (
self.convphase(
array_s,
self.pix_size,
self.distance_object,
self.distance_lens,
self.wave_proj,
self.red_factor,
),
shape,
)
[docs] def set_borders_zero_func(self, array_and_shape):
"""call convphase function with surface_tracking parameters
Parameters
----------
array_and_path : tuple containing array/phase [radians], shape of
the frame and path
Returns
-------
height_and_path : tuple containing array/height [m] and path
"""
array, shape = array_and_shape
(x_min, x_max) = shape
if x_max >= self.xmax:
x_max = self.xmax - 1
logger.warning("x_max adjusted")
newarray = np.zeros(self.ref.shape)
newarray[:, self.borders : -self.borders] = resize(
array[
:,
x_min
+ self.borders
- self.xmin : -(self.xmax - x_max + self.borders),
],
(self.ref.shape[0], self.xmax - 2 * self.borders - self.xmin),
)
return newarray
[docs] def convphase(self, phase, pix_size, dist, dist_p_c, wave_len, red_factor):
"""converts phase array into array of the height [m]
Parameters
----------
phase : float
the image phase [radians]
pix_size : float
size of the pixel [m/pixel]
dist : float
distance between object and camera [m]
dist_p_c : float
distance between projector and camera [m]
waven_len : float
wave length of the object [m]
red_factor: int
is the reduction factor
Returns
-------
height: float
array with the height calculated from the phase
Notes
-----
Make sure that the grid is parallel to y
"""
height = phase * dist / (phase - 2 * np.pi / wave_len * dist_p_c)
if self.correct_pos is True:
height = height.astype(float)
[ld, Ld] = phase.shape
x = (np.arange(Ld) - Ld / 2) * pix_size * red_factor
y = (np.arange(ld) - ld / 2) * pix_size * red_factor
[X, Y] = np.meshgrid(x, y)
# perform correction
dX = -X / dist * height
dY = -Y / dist * height
dX[1, :] = 0
dX[-1, :] = 0
dX[:, 1] = 0
dX[:-1] = 0
dY[1, :] = 0
dY[-1, :] = 0
dY[:, 1] = 0
dY[:-1] = 0
# interploate the values on the new grid
height = scipy.interpolate.griddata(
((X + dX).reshape(ld * Ld), (Y + dY).reshape(ld * Ld)),
height.reshape(Ld * ld),
(X, Y),
method="cubic",
)
if self.correct_height is True:
height = height - self.ref_height
# height = height-np.mean(np.mean(height))
return height
[docs] def correctcouple(self, queue_couple):
"""correct phase in order to avoid jump phase"""
((anglemod, shapemod), (angle, shape)) = queue_couple
fix_y = int(np.fix(self.l_y / 20 / self.red_factor))
fix_x = int(np.fix(self.l_x / 2 / self.red_factor))
correct_angle = angle
jump = angle[fix_y, fix_x] - anglemod[fix_y, fix_x]
while abs(jump) > np.pi:
correct_angle = correct_angle - np.sign(jump) * 2 * math.pi
jump = correct_angle[fix_y, fix_x] - anglemod[fix_y, fix_x]
print("angle corrected")
return (correct_angle, shape)
[docs] def wave_vector(self, ref, ymin, ymax, xmin, xmax, sur):
"""compute k_x value with mean reference frame
Parameters
----------
ref: int
frame with averaged
Returns
-------
wave_vector: float
average wave vector from the given frame
"""
Fref = np.fft.fft2(ref, ((ymax - ymin) * sur, (xmax - xmin) * sur))
kxma = np.arange(-(xmax - xmin) * sur / 2, (xmax - xmin) * sur / 2) / sur
indc = np.max(np.fft.fftshift(abs(Fref)), axis=0).argmax()
return abs(kxma[indc])
[docs] def process_ref(self):
"""calculate the reference height from a set of frames of a flat
plate with zero height
"""
frame_filtered = self.rectify_frame(self.ref, self.gain, self.filt)
inversed_filt = np.fft.ifft2(frame_filtered)
inversed_filt = inversed_filt[:: self.red_factor, :: self.red_factor]
ref_angle = np.unwrap(np.angle(inversed_filt), axis=1) # by lines
ref_angle = np.unwrap(ref_angle, axis=0) # by columsref
self.ref_height = np.zeros(ref_angle.shape)
return (
self.calculheight_func((ref_angle, ref_angle.shape))[0] - self.offset
)
if "sphinx" in sys.modules:
_params = WorkSurfaceTracking.create_default_params()
__doc__ += _params._get_formatted_docs()