Source code for fluidimage.works.optical_flow

"""Works optical flow
=====================

See https://docs.opencv.org/4.x/d4/dee/tutorial_optical_flow.html (and
https://github.com/groussea/opyflow by Gauthier Rousseau)

Provides

.. autoclass:: WorkOpticalFlow
   :members:
   :private-members:

"""

import numpy as np

from fluiddyn.util.paramcontainer import ParamContainer
from fluiddyn.util.serieofarrays import SerieOfArraysFromFiles
from fluidimage._opencv import cv2, error_import_cv2
from fluidimage.data_objects.piv import ArrayCouple, HeavyPIVResults

from . import BaseWorkFromSerie
from .with_mask import BaseWorkWithMask


def dict_from_params(params):
    return {k: v for k, v in params.__dict__.items() if not k.startswith("_")}


def optical_flow(
    im0,
    im1,
    feature_params,
    lk_params,
    threshold_diff_ab_ba=10,
    vmin=0,
    vmax=np.inf,
):
    positions0 = cv2.goodFeaturesToTrack(im0, **feature_params)

    positions1, st, err = cv2.calcOpticalFlowPyrLK(
        im0, im1, positions0, None, **lk_params
    )
    positions0r, st, err = cv2.calcOpticalFlowPyrLK(
        im1, im0, positions1, None, **lk_params
    )

    positions = positions0.reshape(-1, 2)
    displacements = positions1.reshape(-1, 2) - positions

    diff = abs(positions0 - positions0r).reshape(-1, 2).max(-1)

    correct_values = diff < threshold_diff_ab_ba
    positions = positions[correct_values]
    displacements = displacements[correct_values]

    if vmin is not None or vmax is not None:
        if vmin is None:
            vmin = 0

        if vmax is None:
            vmax = np.inf

        norm = np.sqrt(displacements[:, 0] ** 2 + displacements[:, 1] ** 2)
        cond = (vmin < norm) & (norm < vmax)
        positions = positions[cond]
        displacements = displacements[cond]

    return positions, displacements


[docs]class WorkOpticalFlow(BaseWorkWithMask, BaseWorkFromSerie):
[docs] @classmethod def create_default_params(cls): "Create an object containing the default parameters (class method)." params = ParamContainer(tag="params") cls._complete_params_with_default(params) return params
[docs] @classmethod def _complete_params_with_default(cls, params): if error_import_cv2: raise error_import_cv2 BaseWorkFromSerie._complete_params_with_default(params) params._set_child( "optical_flow", attribs=dict( winSize=(32, 32), maxLevel=3, criteria=( cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 50, 0.03, ), ), ) params.optical_flow._set_doc( """Parameters for the flow calculation using Lukas Kanade method See https://docs.opencv.org/3.0-beta/modules/video/doc/motion_analysis_and_object_tracking.html The algorithm is "pyramidal" (multiple passes). - winSize : Size of the search window at each pyramid level. - maxLevel : 0-based maximal pyramid level number; if set to 0, pyramids are not used (single level), if set to 1, two levels are used, and so on; if pyramids are passed to input then algorithm will use as many levels as pyramids have but no more than maxLevel. - criteria : Termination criteria of the iterative search algorithm (after the specified maximum number of iterations criteria.maxCount or when the search window moves by less than criteria.epsilon. """ ) params._set_child( "features", attribs=dict( maxCorners=70000, qualityLevel=0.09, minDistance=4, blockSize=16 ), ) params.features._set_doc( """Parameters for the Good Feature to Track algorithm (Shi-Tomasi Corner Detector) See https://docs.opencv.org/3.0-beta/modules/imgproc/doc/feature_detection.html#goodfeaturestotrack - maxCorners : Maximum number of corners to return. If there are more corners than are found, the strongest of them is returned. - qualityLevel : Parameter characterizing the minimal accepted quality of image corners. The parameter value is multiplied by the best corner quality measure, which is the minimal eigenvalue or the Harris function response. The corners with the quality measure less than the product are rejected. For example, if the best corner has the quality measure = 1500, and the qualityLevel=0.01 , then all the corners with the quality measure less than 15 are rejected. - minDistance : Minimum possible Euclidean distance between the returned corners. - blockSize : Size of an average block for computing a derivative covariation matrix over each pixel neighborhood.""" ) cls._complete_params_with_default_mask(params) params._set_child( "filters", attribs={ "threshold_diff_ab_ba": 1.0, "displacement_min": None, "displacement_max": None, }, doc=""" Parameters indicating how are detected and processed false vectors. - threshold_diff_ab_ba : 1. ??? - displacement_min : None Vectors smaller than `displacement_min` are considered as false vectors. - displacement_max : None Vectors larger than `displacement_max` are considered as false vectors. """, )
def __init__(self, params): super().__init__(params) self.dict_params_features = dict_from_params(self.params.features) self.dict_params_flow = dict_from_params(self.params.optical_flow)
[docs] def calcul(self, couple): if isinstance(couple, SerieOfArraysFromFiles): couple = ArrayCouple(serie=couple) elif isinstance(couple, dict): couple = ArrayCouple(**couple) if not isinstance(couple, ArrayCouple): raise ValueError couple.apply_mask(self.params.mask) im0, im1 = couple.get_arrays() positions, displacements = optical_flow( im0, im1, self.dict_params_features, self.dict_params_flow, threshold_diff_ab_ba=self.params.filters.threshold_diff_ab_ba, vmin=self.params.filters.displacement_min, vmax=self.params.filters.displacement_max, ) xs = positions[:, 0] ys = positions[:, 1] xs, ys = self._xyoriginalimage_from_xymasked(xs, ys) result = HeavyPIVResults( deltaxs=displacements[:, 0], deltays=displacements[:, 1], xs=xs, ys=ys, couple=couple, params=self.params, ) return result