Source code for fluidimage.reconstruct.tomo.mlos

"""MLOS (Multiplicative Line of Sight)
======================================

Reference::

    C. Atkinson and J. Soria, “An efficient simultaneous reconstruction
    technique for tomographic particle image velocimetry,” Exp Fluids, vol. 47,
    no. 4–5, p. 553, Oct. 2009.


.. autoclass:: TomoMLOSBase
   :members:
   :private-members:

.. autoclass:: TomoMLOSRbf
   :members:
   :private-members:

.. autoclass:: TomoMLOSNeighbour
   :members:
   :private-members:

.. autoclass:: TomoMLOSCV
   :members:
   :private-members:

"""

import os
from pathlib import Path

import dask.array as da
import matplotlib.pyplot as plt
import numpy as np
from dask.diagnostics import ProgressBar
from psutil import virtual_memory
from scipy import interpolate, sparse

from fluidimage._opencv import cv2, error_import_cv2
from fluidimage.calibration.calib_cv import CalibCV
from fluidimage.data_objects.tomo import ArrayTomoCV
from fluidimage.util import imread


[docs]class TomoMLOSBase: """MLOS can be summarized in the following steps: 1. Project the world coordinates to pixel coordinates. 2. Interpolate intensity of the neighbouring pixels. 3. Project back the interpolated intesities onto world coordinates and apply them multiplicatively. """ def __init__(self, cls_calib, cls_array, *cams, **kwargs): self.array = cls_array(params=None, **kwargs) self.cams = [] for cam in cams: cam_name = Path(cam).name.split(".")[0] self.cams.append(cam_name) setattr(self, cam_name, cls_calib(cam))
[docs] def reconstruct( self, pix: dict, image: np.ndarray, threshold: float, chunks: tuple, save: bool, ): """Performs MLOS reconstruction parallely using Dask. The reconstruction is done in memory when `save=False` and in the filesystem when `save=True`. """ self.array.image_path = image interp = self.get_interpolator(image, threshold) with ProgressBar(): x_pix = da.from_array(pix["x"], chunks=chunks) y_pix = da.from_array(pix["y"], chunks=chunks) i_vox = da.map_blocks(interp, x_pix, y_pix) i_vox = i_vox ** (1.0 / len(self.cams)) if save: if os.path.exists(self.array.h5file_path): # Create a temporary copy to read from file, dset, h5file_tmp = self.array.load_dataset(copy=True) self.array.I = da.from_array(dset, chunks=chunks) self.array.I *= i_vox self.array.save() file.close() os.remove(h5file_tmp) else: self.array.I = i_vox self.array.save() else: self.array.I *= i_vox.compute()
[docs] def verify_projection(self, cam="cam0", skip=1): """Graphically verify the projection performed by `phys2pix` method.""" fig, axes = plt.subplots(1, 3, figsize=(15, 4)) cmap = plt.get_cmap("gray") pix = self.phys2pix(cam) titles = [r + r"$_{world}$" for r in ("x", "y", "z")] for i, axt in enumerate(zip(axes, titles)): ax, title = axt try: r_world = self.array.grid[::skip, i] except AttributeError: key = ("X", "Y", "Z")[i] r_world = getattr(self.array, key).flatten()[::skip] color = cmap(r_world / r_world.max()) ax.scatter( pix["x"].flatten()[::skip], pix["y"].flatten()[::skip], c=color ) ax.set_title(title) # ax.pcolor(pix["x"], pix["y"], r_world) ax.set_xlabel(r"$x_{pix}$") ax.set_ylabel(r"$y_{pix}$")
[docs]class TomoMLOSRbf(TomoMLOSBase): """Interpolation is calculated using a radial basis function (RBF) interpolation with a gaussian kernel. """ def get_interpolator(self, image, threshold): im = imread(image) if threshold is not None: im[im < threshold] = 0.0 im_sparse = sparse.coo_matrix(im) # Interpolation function return interpolate.Rbf( im_sparse.col, im_sparse.row, im_sparse.data, function="gaussian" )
[docs]class TomoMLOSNeighbour(TomoMLOSBase): """Interpolation is calculated using a nearest neighbour interpolation.""" def get_interpolator(self, image, threshold): im = imread(image) if threshold is not None: im[im < threshold] = 0.0 im_sparse = sparse.coo_matrix(im) # Interpolation function return interpolate.NearestNDInterpolator( (im_sparse.col, im_sparse.row), im_sparse.data )
[docs]class TomoMLOSCV(TomoMLOSNeighbour): def __init__(self, *cams, **kwargs): super().__init__(CalibCV, ArrayTomoCV, *cams, **kwargs) if error_import_cv2: raise error_import_cv2
[docs] def phys2pix(self, cam_name: str): """Tranform the 'physical' world coordinates to 'pixel' coordinates.""" cam = getattr(self, cam_name) grid3d = self.array.grid pix = np.empty((len(grid3d), 1, 2)) for i, z in enumerate(self.array.zs): grid_zslice = grid3d[grid3d[:, 2] == z] n = len(grid_zslice) istart = i * n iend = (i + 1) * n rotation = cam.get_rotation(z) translate = cam.get_translate(z) pix[istart:iend, ...], jac = cv2.projectPoints( grid_zslice, rotation, translate, cam.params.cam_mtx, cam.params.kc, ) return {"x": pix[:, 0, 0], "y": pix[:, 0, 1]}
[docs] def reconstruct( self, pix: dict, image: np.ndarray, threshold=None, chunks=None, save=False, ): """Estimate the maximum size of chunk which can fit in the memory and execute the parent method. """ if chunks is None: chunks = len(pix["x"]) // os.cpu_count() nmax = int(_estimate_max_array_size(self.array.dtype)) // 6 if chunks > nmax: chunks = nmax super().reconstruct(pix, image, threshold, chunks, save)
def _estimate_max_array_size(dtype=np.float64): mem = virtual_memory() nbytes = np.array([0], dtype=dtype).nbytes return mem.available // nbytes if __name__ == "__main__": import shutil from fluidimage import get_path_image_samples path_image_samples = get_path_image_samples() path = path_image_samples / "TomoPIV" / "calibration" cameras = [str(path / f"cam{i}.h5") for i in range(4)] tomo = TomoMLOSCV( *cameras, xlims=(-10, 10), ylims=(-10, 10), zlims=(-5, 5), nb_voxels=(20, 20, 10), ) tomo.verify_projection() particle_images = path_image_samples / "TomoPIV" / "particle" output_dir = path_image_samples / "TomoPIV" / "tomo" if output_dir.exists(): shutil.rmtree(output_dir, ignore_errors=True) for cam in tomo.cams: print(f"Projecting {cam}...") pix = tomo.phys2pix(cam) i0 = 1 for i1 in ["a", "b"]: image = str(particle_images / f"{cam}.pre" / f"im{i0:05.0f}{i1}.tif") tomo.array.init_paths(image) print(f"MLOS of {cam} on {image}: reconstructing...") tomo.reconstruct(pix, image, threshold=None, save=True)