Source code for crappy.tool.image_processing.dic_ve
# coding: utf-8
import numpy as np
from typing import List, Tuple
from ..._global import OptionalModule
from ..camera_config import SpotsBoxes, Box
try:
import cv2
except (ModuleNotFoundError, ImportError):
cv2 = OptionalModule("opencv-python")
[docs]
class DICVETool:
"""This class is the core of the :class:`~crappy.blocks.DICVE` Block.
It tracks patches on images received from a :class:`~crappy.camera.Camera`
object, and computes a strain value at each new image.
It relies on cross-correlation algorithms to calculate the displacement.
Different algorithms are available depending on the needs. This tool is
mainly used to perform video-extensometry on speckled surfaces, although it
could as well be of use for other applications.
.. versionadded:: 1.4.0
.. versionchanged:: 2.0.0 renamed from *DISVE* to *DICVETool*
"""
[docs]
def __init__(self,
patches: SpotsBoxes,
method: str = 'Disflow',
alpha: float = 3,
delta: float = 1,
gamma: float = 0,
finest_scale: int = 1,
iterations: int = 1,
gradient_iterations: int = 10,
patch_size: int = 8,
patch_stride: int = 3,
border: float = 0.2,
safe: bool = True,
follow: bool = True) -> None:
"""Sets a few attributes and initializes DISFlow if this method was
selected.
Args:
patches: An instance of the
:class:`~crappy.tool.camera_config.config_tools.SpotsBoxes` class,
containing the coordinates of the patches to track.
method: The method to use to calculate the displacement. `Disflow` uses
opencv's DISOpticalFlow and `Lucas Kanade` uses opencv's
calcOpticalFlowPyrLK, while all other methods are based on a basic
cross-correlation in the Fourier domain. `Pixel precision` calculates
the displacement by getting the position of the maximum of the
cross-correlation, and has thus a 1-pixel resolution. It is mainly
meant for debugging. `Parabola` refines the result of
`Pixel precision` by interpolating the neighborhood of the maximum, and
has thus a sub-pixel resolution.
.. versionadded:: 1.5.9
alpha: Weight of the smoothness term in DISFlow, as a :obj:`float`.
delta: Weight of the color constancy term in DISFlow, as a :obj:`float`.
gamma: Weight of the gradient constancy term in DISFlow , as a
:obj:`float`.
finest_scale: Finest level of the Gaussian pyramid on which the flow
is computed in DISFlow (`0` means full scale), as an :obj:`int`.
iterations: Maximum number of gradient descent iterations in the
patch inverse search stage in DISFlow, as an :obj:`int`.
gradient_iterations: Maximum number of gradient descent iterations
in the patch inverse search stage in DISFlow, as an :obj:`int`.
.. versionchanged:: 1.5.9
renamed from *gditerations* to *gradient_iterations*
patch_size: Size of an image patch for matching in DISFlow
(in pixels).
patch_stride: Stride between neighbor patches in DISFlow. Must be
less than patch size.
border: Crop the patch on each side according to this value before
calculating the displacements. 0 means no cropping, 1 means the entire
patch is cropped.
safe: If :obj:`True`, checks whether the patches aren't exiting the
image, and raises an error if that's the case.
follow: It :obj:`True`, the patches will move to follow the displacement
of the image.
.. versionremoved:: 1.5.10 *img0* and *show_image* arguments
"""
# These attributes are accessed by the parent class
self.patches = patches
self._offsets = [(0, 0) for _ in patches]
# Other attributes to set
self._method = method
self._border = border
self._safe = safe
self._follow = follow
# These attributes will be set later
self._img0 = None
self._height, self._width = None, None
# Initialize DISFlow if it is the selected method
if self._method == 'Disflow':
self._dis = cv2.DISOpticalFlow_create(cv2.DISOPTICAL_FLOW_PRESET_FAST)
self._dis.setVariationalRefinementIterations(iterations)
self._dis.setVariationalRefinementAlpha(alpha)
self._dis.setVariationalRefinementDelta(delta)
self._dis.setVariationalRefinementGamma(gamma)
self._dis.setFinestScale(finest_scale)
self._dis.setGradientDescentIterations(gradient_iterations)
self._dis.setPatchSize(patch_size)
self._dis.setPatchStride(patch_stride)
else:
self._dis = None
[docs]
def set_img0(self, img0: np.ndarray) -> None:
"""Sets the reference image for the cross-correlation.
.. versionadded:: 1.5.10
"""
self._img0 = img0
self._height, self._width, *_ = img0.shape
# Now that there's an initial image, checking that the patches are valid
if self._safe:
self._check_offsets()
[docs]
def calculate_displacement(
self, img: np.ndarray) -> Tuple[List[Tuple[float, float]], float, float,
List[Tuple[float, float]]]:
"""Returns the displacement of every patch, calculated according to the
chosen method.
Also updates the patch offsets if required, and updates the window for
following the patches if any.
.. versionadded:: 1.5.9
"""
# Making sure the reference image exists
if self._img0 is None:
raise ValueError("The method set_img0 must be called first for setting "
"the reference image !")
# Compute the displacement for each patch
displacements = []
for patch, offset in zip(self.patches, self._offsets):
if patch is None:
continue
if self._method == 'Disflow':
displacements.append(self._calc_disflow(patch, img, offset))
elif self._method == 'Pixel precision':
displacements.append(self._calc_pixel_precision(patch, img, offset))
elif self._method == 'Parabola':
displacements.append(self._calc_parabola(patch, img, offset))
elif self._method == 'Lucas Kanade':
displacements.append(self._calc_lucas_kanade(patch, img, offset))
else:
raise ValueError("Wrong method specified !")
# If required, updates the patch offsets
if self._follow:
for disp, (y_offset, x_offset), patch in zip(displacements,
self._offsets,
self.patches):
patch.x_start = round(patch.x_start + disp[0])
patch.x_end = round(patch.x_end + disp[0])
patch.y_start = round(patch.y_start + disp[1])
patch.y_end = round(patch.y_end + disp[1])
patch.x_centroid = (patch.x_end + patch.x_start) / 2
patch.y_centroid = (patch.y_end + patch.y_start) / 2
disp[0] += x_offset
disp[1] += y_offset
patch.x_disp = disp[0]
patch.y_disp = disp[1]
self._offsets = [(round(y_disp), round(x_disp)) for
x_disp, y_disp in displacements]
# Check that the patches are not exiting the image
if self._safe:
self._check_offsets()
else:
for (x_disp, y_disp), patch in zip(displacements, self.patches):
patch.x_disp = x_disp
patch.y_disp = y_disp
max_x = max(self.patches,
key=lambda patch_: patch_.x_centroid if patch_ is not None
else -float('inf'))
min_x = min(self.patches,
key=lambda patch_: patch_.x_centroid if patch_ is not None
else float('inf'))
max_y = max(self.patches,
key=lambda patch_: patch_.y_centroid if patch_ is not None
else -float('inf'))
min_y = min(self.patches,
key=lambda patch_: patch_.y_centroid if patch_ is not None
else float('inf'))
# If there are multiple spots, the x and y strains can be computed
if len(self.patches) > 1:
try:
exx = ((max_x.x_disp - min_x.x_disp) / self.patches.x_l0) * 100
except ZeroDivisionError:
exx = 0
try:
eyy = ((max_y.y_disp - min_y.y_disp) / self.patches.y_l0) * 100
except ZeroDivisionError:
eyy = 0
centers = [(patch.y_centroid, patch.x_centroid)
for patch in self.patches if patch is not None]
disps = [(patch.y_disp, patch.x_disp)
for patch in self.patches if patch is not None]
return centers, eyy, exx, disps
# If only one spot was detected, the strain isn't computed
else:
x = self.patches[0].x_centroid
y = self.patches[0].y_centroid
x_disp = self.patches[0].x_disp
y_disp = self.patches[0].y_disp
return [(y, x)], 0, 0, [(y_disp, x_disp)]
def _calc_disflow(self,
patch: Box,
img: np.ndarray,
offset: Tuple[int, int]) -> List[float]:
"""Returns the displacement between the original and the current image with
a sub-pixel precision, using DISFlow."""
disp_img = self._dis.calc(self._get_patch(self._img0, patch, offset),
self._get_patch(img, patch), None)
return np.average(self._trim_patch(disp_img), axis=(0, 1)).tolist()
def _calc_pixel_precision(self,
patch: Box,
img: np.ndarray,
offset: Tuple[int, int]) -> List[float]:
"""Returns the displacement between the original and the current image with
a precision limited to 1 pixel."""
cross_correl, max_width, max_height = self._cross_correlation(
self._get_patch(self._img0, patch, offset), self._get_patch(img, patch))
height, width = cross_correl.shape[0], cross_correl.shape[1]
return [-(max_width - width / 2), -(max_height - height / 2)]
def _calc_parabola(self,
patch: Box,
img: np.ndarray,
offset: Tuple[int, int]) -> List[float]:
"""Returns the displacement between the original and the current image with
a sub-pixel precision, using two parabola fits (one in x and one in y)."""
cross_correl, max_width, max_height = self._cross_correlation(
self._get_patch(self._img0, patch, offset), self._get_patch(img, patch))
height, width = cross_correl.shape[0], cross_correl.shape[1]
y_disp = -(max_height - height / 2)
x_disp = -(max_width - width / 2)
x_disp -= self._parabola_fit(cross_correl[max_height,
max_width - 1: max_width + 2])
y_disp -= self._parabola_fit(cross_correl[max_height - 1: max_height + 2,
max_width])
return [x_disp, y_disp]
def _calc_lucas_kanade(self,
patch: Box,
img: np.ndarray,
offset: Tuple[int, int]) -> List[float]:
"""Returns the displacement between the original and the current image with
a sub-pixel precision, using the Lucas Kanade algorithm."""
# Getting the center of the patch
x_top, x_bottom, y_left, y_right = patch.sorted()
center_y, center_x = (y_right - y_left) // 2, (x_bottom - x_top) // 2
next_, _, _ = cv2.calcOpticalFlowPyrLK(
self._get_patch(self._img0, patch, offset), self._get_patch(img, patch),
np.array([[center_x, center_y]]).astype('float32'), None)
new_x, new_y = np.squeeze(next_)
return [new_x - center_x, new_y - center_y]
@staticmethod
def _parabola_fit(arr: np.ndarray) -> float:
"""Returns the abscissa of the maximum of a parabola defined by 3
points in positions x=-1, x=0 and x=1.
Args:
arr: This array contains the y values for the 3 points.
"""
return (arr[0] - arr[2]) / (2 * (arr[0] - 2 * arr[1] + arr[2]))
@staticmethod
def _cross_correlation(img0: np.ndarray,
img1: np.ndarray) -> Tuple[np.ndarray, int, int]:
"""Performs a cross-correlation operation on two patches in the Fourier
domain.
Returns:
The result of the correlation in the real domain as an image, as well as
the position of the maximum of this image.
"""
# Getting the size of the image
height, width = img0.shape[0], img0.shape[1]
# Find the closest power of 2 length from the image
height_log2 = 2 ** int(np.log2(height + 0.5))
width_log2 = 2 ** int(np.log2(width + 0.5))
# Cropping the image to a power of 2
img0 = img0[(height - height_log2) // 2: (height + height_log2) // 2,
(width - width_log2) // 2: (width + width_log2) // 2]
img1 = img1[(height - height_log2) // 2: (height + height_log2) // 2,
(width - width_log2) // 2: (width + width_log2) // 2]
# Convert to Fourier for fast cross-correlation
img0_fourier = cv2.dft(img0.astype(np.float32),
flags=cv2.DFT_COMPLEX_OUTPUT)
img1_fourier = cv2.dft(img1.astype(np.float32),
flags=cv2.DFT_COMPLEX_OUTPUT)
# Compute cross-correlation by convolution
cross_fourier = cv2.mulSpectrums(img0_fourier, img1_fourier,
flags=0, conjB=True)
# Convert back to physical space
cross_shifted = cv2.idft(cross_fourier)
# Un-shift after FFT
cross_correl = np.fft.ifftshift(cross_shifted[:, :, 0])
# Find the maximum of the cross-correlation
max_width, max_height = cv2.minMaxLoc(cross_correl)[-1]
# Keep the average in case several maxima found
max_width, max_height = int(np.mean(max_width)), int(np.mean(max_height))
return cross_correl, max_width, max_height
@staticmethod
def _get_patch(img: np.ndarray,
patch: Box,
offset: Tuple[int, int] = (0, 0)) -> np.ndarray:
"""Returns the part of the image corresponding to the given patch at the
given offset."""
y_off, x_off = offset
x_top, x_bottom, y_left, y_right = patch.sorted()
return np.array(img[y_left - y_off: y_right - y_off,
x_top - x_off: x_bottom - x_off])
def _trim_patch(self, patch: np.ndarray) -> np.ndarray:
"""Trims the border of a patch according to the value set by the user, and
returns the sub image corresponding to the trimmed patch."""
height, width, *_ = patch.shape
return patch[int(height * self._border / 2):
int(height * (1 - self._border / 2)),
int(width * self._border / 2):
int(width * (1 - self._border / 2))]
def _check_offsets(self) -> None:
"""Check if the patches are still within the image, and raises an error if
one of them is out."""
for patch in self.patches:
if patch is None:
continue
x_top, x_bottom, y_left, y_right = patch.sorted()
# Checking the left border
if x_top < 0:
raise RuntimeError("Region exiting the ROI (left)")
# Checking the right border
elif x_bottom > self._width:
raise RuntimeError("Region exiting the ROI (right)")
# Checking the top border
if y_left < 0:
raise RuntimeError("Region exiting the ROI (top)")
# Checking the bottom border
elif y_right > self._height:
raise RuntimeError("Region exiting the ROI (bottom)")