Source code for crappy.tool.image_processing.gpu_correl
# coding:utf-8
import warnings
from math import ceil
import numpy as np
from pkg_resources import resource_filename
from typing import Any, Tuple, Optional, Union, List
from pathlib import Path
from itertools import chain
import logging
from .fields import get_field
from ..._global import OptionalModule
try:
import pycuda.tools
import pycuda.driver
import pycuda.compiler
import pycuda.gpuarray as gpuarray
import pycuda.reduction
except (ImportError, ModuleNotFoundError):
pycuda = OptionalModule("pycuda",
"PyCUDA and CUDA are necessary to use GPUCorrel")
gpuarray = OptionalModule("pycuda",
"PyCUDA and CUDA are necessary to use GPUCorrel")
def interp_nearest(arr: np.ndarray, ny: int, nx: int) -> np.ndarray:
"""Reshapes an input array to the specified dimension using the nearest
interpolation, using only :mod:`numpy` methods.
Args:
arr: The array to interpolate.
ny: The new dimension along the `y` axis.
nx: The new dimension along the `y` axis.
Returns:
A reshaped version of the input array obtained with the nearest
interpolation.
.. versionadded:: 1.4.0
"""
# If the shape is already fine, nothing more to do
if arr.shape == (ny, nx):
return arr
y, x = arr.shape
rx, ry = x / nx, y / ny
out = np.empty((ny, nx), dtype=np.float32)
# Filling individually each value of the returned array
for j in range(ny):
for i in range(nx):
out[j, i] = arr[int(ry * j + .5), int(rx * i + .5)]
return out
class CorrelStage:
"""Represents a stage of the pyramid used by the :class:`GPUCorrelTool` for
performing GPU correlation.
This class actually performs the GPU computation, while the calling classes
only manage the data.
.. versionadded:: 1.4.0
"""
def __init__(self,
img_size: Tuple[int, int],
logger_name: str,
verbose: int = 0,
iterations: int = 5,
mul: float = 3,
n_fields: Optional[int] = None,
kernel_file: Optional[Union[Path, str]] = None) -> None:
"""Sets the args and instantiates the :mod:`pycuda` objects.
Args:
img_size: The shape of the images to process. It is given beforehand so
that the memory can be allocated before the test starts.
logger_name: The name of the parent logger, as a :obj:`str`.
.. versionadded:: 2.0.0
verbose: The verbose level as an integer, between `0` and `3`. At level
`0` no information is displayed, and at level `3` so much information
is displayed that is slows the code down.
iterations: The maximum number of iterations to run before returning the
results. The results may be returned before if the residuals start
increasing.
mul: The scalar by which the direction will be multiplied before being
added to the solution. If it's too high, the convergence will be fast
but there's a risk that to go past the solution and to diverge. If it's
too low, the convergence will be slower and require more iterations.
`3` was found to be an acceptable value in most cases, but it is
recommended to tune this value for each application so that the
convergence is neither too slow nor too fast.
n_fields: The number of fields to project the displacement on.
.. versionchanged:: 1.5.10 renamed from *Nfields* to *n_fields*
kernel_file: The path to the file containing the kernels to use for the
correlation. Can be a :obj:`pathlib.Path` object or a :obj:`str`.
.. versionchanged:: 1.5.10
now explicitly listing the *verbose*, *iterations*, *mul* and
*kernel_file* arguments
.. versionremoved:: 1.5.10 *show_diff*, *img*, *mask* and *fields* arguments
"""
self._logger: Optional[logging.Logger] = None
self._logger_name = logger_name
# Setting the args
self._verbose = verbose
self._iterations = iterations
self._mul = mul
self._n_fields = n_fields
self._ready = False
self._height, self._width = img_size
self._debug(2, f"Initializing with resolution {img_size}")
# These attributes will be set later
self._r_x, self._r_y = None, None
self._mask_array = None
self._array = None
self._array_d = None
self._fields = False
self._r_grid = None
self._r_block = None
self._dev_r_out = None
self.res = None
# Setting the grid and the block for the kernel
self._grid = (int(ceil(self._width / 32)), int(ceil(self._height / 32)))
self._block = (int(ceil(self._width / self._grid[0])),
int(ceil(self._height / self._grid[1])), 1)
self._debug(3, f"Default grid: {self._grid}, block: {self._block}")
# dev_g stores the G arrays (to compute the research direction)
self._dev_g = [gpuarray.empty(img_size, np.float32)
for _ in range(self._n_fields)]
# dev_fields_x/y store the fields value along X and Y
self._dev_fields_x = [gpuarray.empty(img_size, np.float32)
for _ in range(self._n_fields)]
self._dev_fields_y = [gpuarray.empty(img_size, np.float32)
for _ in range(self._n_fields)]
# dev_h Stores the Hessian matrix
self._dev_h = np.zeros((self._n_fields, self._n_fields), np.float32)
# And dev_h_i stores its invert
self.dev_h_i = gpuarray.empty((self._n_fields, self._n_fields), np.float32)
# dev_out is written with the difference of the images
self._dev_out = gpuarray.empty((self._height, self._width), np.float32)
# dev_x stores the value of the parameters (what is actually computed)
self.dev_x = gpuarray.empty(self._n_fields, np.float32)
# dev_vec stores the research direction
self._dev_vec = gpuarray.empty(self._n_fields, np.float32)
# dev_orig stores the original image on the device
self.dev_orig = gpuarray.empty(img_size, np.float32)
# dev_grad_x store the gradient along X of the original image on the device
self._dev_grad_x = gpuarray.empty(img_size, np.float32)
# And along Y
self._dev_grad_y = gpuarray.empty(img_size, np.float32)
# Opening the kernel file and compiling the module
if kernel_file is None:
self._debug(2, "Kernel file not specified")
kernel_file = resource_filename('crappy',
'tool/image_processing/kernels.cu')
with open(kernel_file, "r") as file:
self._debug(3, "Sourcing module")
mod = pycuda.compiler.SourceModule(file.read() % (self._width,
self._height,
self._n_fields))
# Extracting individual functions from the data of the kernel file
self._resample_orig_krnl = mod.get_function('resampleO')
self._resample_krnl = mod.get_function('resample')
self._gradient_krnl = mod.get_function('gradient')
self._make_g_krnl = mod.get_function('makeG')
self._make_diff = mod.get_function('makeDiff')
self._dot_krnl = mod.get_function('myDot')
self._add_krnl = mod.get_function('kadd')
self._mul_red_krnl = pycuda.reduction.ReductionKernel(
np.float32, neutral="0", reduce_expr="a+b", map_expr="x[i]*y[i]",
arguments="float *x, float *y")
self._least_square = pycuda.reduction.ReductionKernel(
np.float32, neutral="0", reduce_expr="a+b", map_expr="x[i]*x[i]",
arguments="float *x")
# Getting the texture references
self._tex = mod.get_texref('tex')
self._tex_d = mod.get_texref('tex_d')
self._tex_mask = mod.get_texref('texMask')
# All textures use normalized coordinates except for the mask
for t in (self._tex, self._tex_d):
t.set_flags(pycuda.driver.TRSF_NORMALIZED_COORDINATES)
for t in (self._tex, self._tex_d, self._tex_mask):
t.set_filter_mode(pycuda.driver.filter_mode.LINEAR)
t.set_address_mode(0, pycuda.driver.address_mode.BORDER)
t.set_address_mode(1, pycuda.driver.address_mode.BORDER)
# Preparing kernels for less overhead when called
self._resample_orig_krnl.prepare("Pii", texrefs=[self._tex])
self._resample_krnl.prepare("Pii", texrefs=[self._tex_d])
self._gradient_krnl.prepare("PP", texrefs=[self._tex])
self._make_diff.prepare("PPPP",
texrefs=[self._tex, self._tex_d, self._tex_mask])
self._add_krnl.prepare("PfP")
def set_orig(self, img) -> None:
"""Sets the original image from a given CPU or GPU array"""
if isinstance(img, np.ndarray):
self._debug(3, "Setting original image from ndarray")
self.dev_orig.set(img)
elif isinstance(img, gpuarray.GPUArray):
self._debug(3, "Setting original image from GPUArray")
self.dev_orig = img
else:
raise ValueError("Error ! Unknown type of data given to set_orig()")
self.update_orig()
def update_orig(self) -> None:
"""Updates the original image."""
self._debug(3, "Updating original image")
self._array = pycuda.driver.gpuarray_to_array(self.dev_orig, 'C')
self._tex.set_array(self._array)
self._compute_gradients()
self._ready = False
def prepare(self) -> None:
"""Computes all the necessary tables to perform correlation."""
# Sets the mask array if none was specified
if self._mask_array is None:
self._debug(2, "No mask set when preparing, using a basic one, with a "
"border of 5% the dimension")
mask = np.zeros((self._height, self._width), np.float32)
mask[self._height // 20: -self._height // 20,
self._width // 20: -self._width // 20] = 1
self.set_mask(mask)
# Only necessary to prepare if not already ready
if not self._ready:
# Checking that everything's set for preparing
if self._array is None:
self._debug(1, "Tried to prepare but original texture is not set !")
elif not self._fields:
self._debug(1, "Tried to prepare but fields are not set !")
# Actually computing the tables
else:
self._make_g()
self._make_h()
self._ready = True
self._debug(3, "Ready!")
else:
self._debug(1, "Tried to prepare when unnecessary, doing nothing...")
def resample_orig(self, new_y: int, new_x: int, dev_out) -> None:
"""Resamples the original image with a new dimension."""
# New grid and block to be used
grid = (int(ceil(new_x / 32)), int(ceil(new_y / 32)))
block = (int(ceil(new_x / grid[0])), int(ceil(new_y / grid[1])), 1)
self._debug(3, f"Resampling Orig texture, grid: {grid}, block: {block}")
# Now resampling
self._resample_orig_krnl.prepared_call(self._grid, self._block,
dev_out.gpudata,
np.int32(new_x), np.int32(new_y))
self._debug(3, f"Resampled original texture to {dev_out.shape}")
def resample_d(self, new_y: int, new_x: int) -> Any:
"""Resamples the texture with a new dimension and returns it in a GPU
array."""
if (self._r_x, self._r_y) != (np.int32(new_x), np.int32(new_y)):
self._r_grid = (int(ceil(new_x / 32)), int(ceil(new_y / 32)))
self._r_block = (int(ceil(new_x / self._r_grid[0])),
int(ceil(new_y / self._r_grid[1])), 1)
self._r_x, self._r_y = np.int32(new_x), np.int32(new_y)
self._dev_r_out = gpuarray.empty((new_y, new_x), np.float32)
self._debug(3, f"Resampling img_d texture to {new_y, new_x}, "
f"grid: {self._r_grid}, block:{self._r_block}")
self._resample_krnl.prepared_call(self._r_grid, self._r_block,
self._dev_r_out.gpudata,
self._r_x, self._r_y)
return self._dev_r_out
def set_fields(self, fields_x, fields_y) -> None:
"""Sets the fields on which hto project the displacement."""
self._debug(2, "Setting fields")
if isinstance(fields_x, np.ndarray):
self._dev_fields_x.set(fields_x)
self._dev_fields_y.set(fields_y)
elif isinstance(fields_x, gpuarray.GPUArray):
self._dev_fields_x = fields_x
self._dev_fields_y = fields_y
self._fields = True
def set_image(self, img_d) -> None:
"""Set the current image, to be compared with the reference image."""
if isinstance(img_d, np.ndarray):
self._debug(3, "Creating texture from numpy array")
self._array_d = pycuda.driver.matrix_to_array(img_d, "C")
elif isinstance(img_d, gpuarray.GPUArray):
self._debug(3, "Creating texture from gpuarray")
self._array_d = pycuda.driver.gpuarray_to_array(img_d, "C")
else:
self._debug(0, "Error ! Unknown type of data given to .set_image()")
raise ValueError
self._tex_d.set_array(self._array_d)
self.dev_x.set(np.zeros(self._n_fields, dtype=np.float32))
def set_mask(self, mask) -> None:
"""Sets the mask to use for weighting the images."""
self._debug(3, "Setting the mask")
if not mask.dtype == np.float32:
self._debug(2, "Converting the mask to float32")
mask = mask.astype(np.float32)
if isinstance(mask, np.ndarray):
self.maskArray = pycuda.driver.matrix_to_array(mask, 'C')
elif isinstance(mask, gpuarray.GPUArray):
self.maskArray = pycuda.driver.gpuarray_to_array(mask, 'C')
else:
raise ValueError("Error! Mask data type not understood")
self._tex_mask.set_array(self.maskArray)
def set_disp(self, x) -> None:
"""Sets the displacement fields computed from the previous stages."""
if isinstance(x, gpuarray.GPUArray):
self.dev_x = x
elif isinstance(x, np.ndarray):
self.dev_x.set(x)
else:
raise ValueError("Error! Unknown type of data given to "
"CorrelStage.set_disp")
def get_disp(self, img_d=None) -> Any:
"""Projects the displacement on the base fields, and returns the result."""
self._debug(3, "Calling main routine")
if not self._ready:
self._debug(2, "Wasn't ready ! Preparing...")
self.prepare()
if img_d is not None:
self.set_image(img_d)
if self._array_d is None:
raise ValueError("Did not set the image, use set_image() before calling "
"get_disp !")
self._debug(3, "Computing first diff table")
self._make_diff.prepared_call(self._grid, self._block,
self._dev_out.gpudata,
self.dev_x.gpudata,
self._dev_fields_x.gpudata,
self._dev_fields_y.gpudata)
self.res = self._least_square(self._dev_out).get()
self._debug(3, f"res: {self.res / 1e6}")
for i in range(self._iterations):
self._debug(3, f"Iteration {i}")
# Computing the direction of the gradient of each parameter
for j in range(self._n_fields):
self._dev_vec[j] = self._mul_red_krnl(self._dev_g[j], self._dev_out)
# Newton method: the gradient vector is multiplied by the pre-inverted
# Hessian, self._dev_vec now contains the actual research direction
self._dot_krnl(self.dev_h_i, self._dev_vec,
grid=(1, 1), block=(self._n_fields, 1, 1))
# This line simply adds mul times the research direction to self._dev_x
# with a really simple kernel (does self._dev_x += mul * self._dev_vec)
self._add_krnl.prepared_call((1, 1), (self._n_fields, 1, 1),
self.dev_x.gpudata, self._mul,
self._dev_vec.gpudata)
# Simple way to prevent unnecessary copy of data
if self._verbose >= 3:
self._debug(3, f"Direction: {self._dev_vec.get()}")
self._debug(3, f"New X: {self.dev_x.get()}")
# Getting the new residuals
self._make_diff.prepared_call(self._grid, self._block,
self._dev_out.gpudata,
self.dev_x.gpudata,
self._dev_fields_x.gpudata,
self._dev_fields_y.gpudata)
prev_res = self.res
self.res = self._least_square(self._dev_out).get()
# Handling the case when the residuals start increasing
if self.res >= prev_res:
self._debug(3, f"Diverting from the solution "
f"new res={self.res / 1e6} >= {prev_res / 1e6}!")
self._add_krnl.prepared_call((1, 1), (self._n_fields, 1, 1),
self.dev_x.gpudata,
-self._mul,
self._dev_vec.gpudata)
self.res = prev_res
self._debug(3, f"Undone: X={self.dev_x.get()}")
break
self._debug(3, f"res: {self.res / 1e6}")
return self.dev_x.get()
def get_data_display(self) -> np.ndarray:
"""Returns the necessary data for displaying the difference between the
reference and current image."""
return (self._dev_out.get() + 128).astype(np.uint8)
def _make_g(self) -> None:
"""Computes the tables."""
for i in range(self._n_fields):
self._make_g_krnl(self._dev_g[i].gpudata, self._dev_grad_x.gpudata,
self._dev_grad_y.gpudata,
self._dev_fields_x[i], self._dev_fields_y[i],
block=self._block, grid=self._grid)
def _make_h(self) -> None:
"""Computes the tables."""
for i in range(self._n_fields):
for j in range(i + 1):
self._dev_h[i, j] = self._mul_red_krnl(self._dev_g[i],
self._dev_g[j]).get()
if i != j:
self._dev_h[j, i] = self._dev_h[i, j]
self._debug(3, f"Hessian: {self._dev_h}")
self.dev_h_i.set(np.linalg.inv(self._dev_h))
if self._verbose >= 3:
self._debug(3, f"Inverted Hessian: {self.dev_h_i.get()}")
def _debug(self, level: int, msg: str) -> None:
"""Displays the provided debug message only if its debug level is lower
than or equal to the verbose level."""
if self._logger is None:
self._logger = logging.getLogger(
f"{self._logger_name}.{type(self).__name__}")
if level <= self._verbose:
self._logger.log(logging.INFO, msg)
def _compute_gradients(self) -> None:
"""Wrapper to call the gradient kernel."""
self._gradient_krnl.prepared_call(self._grid, self._block,
self._dev_grad_x.gpudata,
self._dev_grad_y.gpudata)
[docs]
class GPUCorrelTool:
"""This class is the core of the :class:`~crappy.blocks.GPUCorrel` and
:class:`~crappy.blocks.GPUVE` Blocks.
It receives images from a :class:`~crappy.camera.Camera`, and performs
GPU-accelerated image correlation on each received image. From this
correlation, rigid body displacements or other fields are identified.
This class is meant to be efficient enough to run in real-time. It relies on
the :class:`CorrelStage` class (not documented) to perform correlation on
different scales. It mainly takes a list of base fields and a reference image
as inputs, and project the displacement between the current image and the
reference one on the base of fields. The optimal fit is achieved by lowering
the residuals with a least-squares method.
The projection on the base is performed sequentially, using the results
obtained at stages with low resolution to initialize the computation on
stages with higher resolution. A Newton method is used to converge towards
an optimal solution.
.. versionadded:: 1.4.0
.. versionchanged:: 2.0.0 renamed from *GPUCorrel* to *GPUCorrelTool*
"""
context = None
[docs]
def __init__(self,
logger_name: str,
context: Optional[Any] = None,
verbose: int = 0,
levels: int = 5,
resampling_factor: float = 2,
kernel_file: Optional[Union[str, Path]] = None,
iterations: int = 4,
fields: Optional[List[Union[str, np.ndarray]]] = None,
ref_img: Optional[np.ndarray] = None,
mask: Optional[np.ndarray] = None,
mul: float = 3) -> None:
"""Sets the args and a few parameters of :mod:`pycuda`.
Args:
logger_name: The name of the parent :obj:`~logging.Logger`, to be used
for setting the Logger of the class.
.. versionadded:: 2.0.0
context: Optionally, the :mod:`pycuda` context to use. If not specified,
a new context is instantiated.
.. versionadded:: 1.5.10
verbose: The verbose level as an integer, between `0` and `3`. At level
`0` no information is displayed, and at level `3` so much information
is displayed that it slows the code down.
levels: Number of levels of the pyramid. More levels may help converging
on images with large strain, but may fail on images that don't contain
low spatial frequency. Fewer levels mean that the program runs faster.
resampling_factor: The factor by which the resolution is divided between
each stage of the pyramid. A low factor ensures coherence between the
stages, but is more computationally intensive. A high factor allows
reaching a finer detail level, but may lead to a coherence loss between
the stages.
kernel_file: The path to the file containing the kernels to use for the
correlation. Can be a :obj:`pathlib.Path` object or a :obj:`str`. If
not provided, the default :ref:`GPU Kernels` are used.
iterations: The maximum number of iterations to run before returning the
results. The results may be returned before if the residuals start
increasing.
fields: The base of fields to use for the projection, given as a
:obj:`list` of :obj:`str` or :mod:`numpy` arrays (both types can be
mixed). Strings are for using automatically-generated fields, the
available ones are :
::
'x', 'y', 'r', 'exx', 'eyy', 'exy', 'eyx', 'exy2', 'z'
If users provide their own fields as arrays, they will be used as-is to
run the correlation. The user-provided fields must be of shape:
::
(patch_height, patch_width, 2)
.. versionchanged:: 2.0.5 provided fields can now be numpy arrays
ref_img: The reference image, as a 2D :obj:`numpy.array` with `dtype`
`float32`. It can either be given at :meth:`__init__`, or set later
with :meth:`set_orig`.
.. versionchanged:: 1.5.10 renamed from *img* to *ref_img*
mask: The mask used for weighting the region of interest on the image. It
is generally used to prevent unexpected behavior on the border of the
image.
mul: The scalar by which the direction will be multiplied before being
added to the solution. If it's too high, the convergence will be fast
but there's a risk to go past the solution and to diverge. If it's too
low, the convergence will be slower and require more iterations. `3`
was found to be an acceptable value in most cases, but it is
recommended to tune this value for each application so that the
convergence is neither too slow nor too fast.
.. versionchanged:: 1.5.10
now explicitly listing the *verbose*, *levels*, *resampling_factor*,
*kernel_file*, *iterations*, *fields*, *mask* and *mul* arguments
.. versionremoved:: 1.5.10 *show_diff* and *img_size* arguments
"""
self._context = context
self._logger: Optional[logging.Logger] = None
self._logger_name = logger_name
self._verbose = verbose
self._debug(3, "You set the verbose level to the maximum.\nIt may help "
"finding bugs or tracking errors but it may also impact the"
" program performance as it will display A LOT of output "
"and add GPU->CPU copies only to display information.\nIf "
"it is not desired, consider lowering the verbosity: 1 or "
"2 is a reasonable choice, 0 won't show anything except "
"errors.")
# Setting the args
self._levels = levels
self._resampling_factor = resampling_factor
self._iterations = iterations
self._mul = mul
self._fields = fields
self._n_fields = len(fields)
self._mask = mask
self._ref_img = ref_img
self._loops = 0
self._last = None
# These attributes will be set later
self._heights, self._widths = None, None
self._stages = None
self._tex_fx = None
self._tex_fy = None
self._resample = None
self._debug(1, f"Initializing... levels: {levels}, verbosity: {verbose}")
# Getting the kernel file
if kernel_file is None:
self._debug(3, "Kernel file not specified, using the default one of "
"crappy")
self._kernel_file = resource_filename('crappy',
'tool/image_processing/kernels.cu')
else:
self._kernel_file = kernel_file
self._debug(3, f"Kernel file:{self._kernel_file}")
[docs]
def set_img_size(self, img_size: Tuple[int, int]) -> None:
"""Sets the image shape, and calls the methods that need this information
for running."""
self._debug(1, f"Setting master resolution: {img_size},")
# Initializing the pycuda context
if self._context is not None:
GPUCorrelTool.context = self._context
else:
pycuda.driver.init()
GPUCorrelTool.context = pycuda.tools.make_default_context()
src_txt = """
texture<float, cudaTextureType2D, cudaReadModeElementType> texFx{0};
texture<float, cudaTextureType2D, cudaReadModeElementType> texFy{0};
__global__ void resample{0}(float* outX, float* outY, int x, int y)
{{
int idx = blockIdx.x*blockDim.x+threadIdx.x;
int idy = blockIdx.y*blockDim.y+threadIdx.y;
if(idx < x && idy < y)
{{
outX[idy*x+idx] = tex2D(texFx{0},(float)idx/x, (float)idy/y);
outY[idy*x+idx] = tex2D(texFy{0},(float)idx/x, (float)idy/y);
}}
}}
"""
# Setting the textures for a faster resampling
src = ''.join([src_txt.format(i) for i in range(self._n_fields)])
source_module = pycuda.compiler.SourceModule(src)
self._tex_fx = [source_module.get_texref(f"texFx{i}")
for i in range(self._n_fields)]
self._tex_fy = [source_module.get_texref(f"texFy{i}")
for i in range(self._n_fields)]
self._resample = [source_module.get_function(f"resample{i}")
for i in range(self._n_fields)]
for tex_fx, tex_fy, resample in zip(self._tex_fx, self._tex_fy,
self._resample):
resample.prepare('PPii', texrefs=[tex_fx, tex_fy])
for tex in chain(self._tex_fx, self._tex_fy):
tex.set_flags(pycuda.driver.TRSF_NORMALIZED_COORDINATES)
tex.set_filter_mode(pycuda.driver.filter_mode.LINEAR)
tex.set_address_mode(0, pycuda.driver.address_mode.BORDER)
tex.set_address_mode(1, pycuda.driver.address_mode.BORDER)
# Setting the dimensions for each stage
height, width, *_ = img_size
self._heights = [round(height / (self._resampling_factor ** i))
for i in range(self._levels)]
self._widths = [round(width / (self._resampling_factor ** i))
for i in range(self._levels)]
# Initializing all the stages
self._stages = [CorrelStage(img_size=(height, width),
logger_name=f"{self._logger_name}."
f"{type(self).__name__}",
verbose=self._verbose,
n_fields=self._n_fields,
iterations=self._iterations,
mul=self._mul,
kernel_file=self._kernel_file)
for i, (height, width) in enumerate(zip(self._heights,
self._widths))]
# Now that the stages exist, setting the reference image, fields, and mask
if self._ref_img is not None:
self.set_orig(self._ref_img)
if self._fields is not None:
self._set_fields(self._fields)
if self._mask is not None:
self._set_mask(self._mask)
[docs]
def set_orig(self, img: np.ndarray) -> None:
"""Sets the reference image, to which the following images will be
compared."""
self._debug(2, "Updating the original image")
# Casting to float32 if needed
if img.dtype != np.float32:
warnings.warn("Correl() takes arrays with dtype np.float32 to allow GPU "
"computing (got {}). Converting to float32."
.format(img.dtype), RuntimeWarning)
img = img.astype(np.float32)
# Setting the reference image for all stages
self._stages[0].set_orig(img)
for prev_stage, stage, height, width in zip(self._stages[:-1],
self._stages[1:],
self._heights, self._widths):
prev_stage.resample_orig(height, width, stage.dev_orig)
stage.update_orig()
[docs]
def prepare(self) -> None:
"""Prepares all the stages before starting the test."""
for stage in self._stages:
stage.prepare()
self._debug(2, "Ready !")
[docs]
def get_disp(self, img_d: Optional[np.ndarray] = None) -> Any:
"""To get the displacement.
This will perform the correlation routine on each stage, initializing with
the previous values every time it will return the computed parameters
as a list.
"""
if img_d is not None:
self._set_image(img_d)
if self._last is not None:
disp = self._last / (self._resampling_factor ** self._levels)
else:
disp = np.array([0] * self._n_fields, dtype=np.float32)
for stage in reversed(self._stages):
disp *= self._resampling_factor
stage.set_disp(disp)
disp = stage.get_disp()
self._loops += 1
if not self._loops % 10:
self._debug(2,
f"Loop {self._loops}, values: {self._stages[0].dev_x.get()}"
f", res: {self._stages[0].res / 1e6}")
return disp
[docs]
def get_data_display(self) -> np.ndarray:
"""Returns the necessary data for displaying the difference between the
reference and current image."""
return self._stages[0].get_data_display()
[docs]
def get_res(self, lvl: int = 0):
"""Returns the last residual of the specified level."""
return self._stages[lvl].res
[docs]
@staticmethod
def clean():
"""Needs to be called at the end, to destroy the :mod:`pycuda` context
properly."""
GPUCorrelTool.context.pop()
def _get_fields(self,
y: Optional[int] = None,
x: Optional[int] = None) -> (Any, Any):
"""Returns the fields, resampled to size `(y, x)`."""
if x is None or y is None:
y, x = self._heights[0], self._widths[0]
out_x = gpuarray.empty((self._n_fields, y, x), np.float32)
out_y = gpuarray.empty((self._n_fields, y, x), np.float32)
grid = (int(ceil(x / 32)), int(ceil(y / 32)))
block = (int(ceil(x / grid[0])), int(ceil(y / grid[1])), 1)
for i, resample in enumerate(self._resample):
resample.prepared_call(grid, block,
out_x[i, :, :].gpudata,
out_y[i, :, :].gpudata,
np.int32(x), np.int32(y))
return out_x, out_y
def _debug(self, level: int, msg: str) -> None:
"""Displays the provided debug message only if its debug level is lower
than or equal to the verbose level."""
if self._logger is None:
self._logger = logging.getLogger(
f"{self._logger_name}.{type(self).__name__}")
if level <= self._verbose:
self._logger.log(logging.INFO, msg)
def _set_fields(self, fields: List[str]) -> None:
"""Computes the fields based on the provided field strings, and sets them
for each stage."""
for field, tex_fx, tex_fy in zip(fields, self._tex_fx, self._tex_fy):
# Getting the fields as numpy arrays
if isinstance(field, str):
field_x, field_y = get_field(field, self._heights[0], self._widths[0])
elif isinstance(field, np.ndarray):
field_x, field_y = field[:, :, 0], field[:, :, 1]
else:
raise TypeError("The provided fields should either be strings or "
"numpy arrays !")
tex_fx.set_array(pycuda.driver.matrix_to_array(field_x, 'C'))
tex_fy.set_array(pycuda.driver.matrix_to_array(field_y, 'C'))
# Setting the fields for each stage
for stage, height, width in zip(self._stages, self._heights, self._widths):
stage.set_fields(*self._get_fields(height, width))
def _set_image(self, img_d: np.ndarray) -> None:
"""Sets the current image for all the stages, to be compared with the
reference image."""
# Casting to float32 if needed
if img_d.dtype != np.float32:
warnings.warn(f"Correl() takes arrays with dtype np.float32 to allow GPU"
f" computing (got {img_d.dtype}). Converting to float32.",
RuntimeWarning)
img_d = img_d.astype(np.float32)
# Setting the current image for all stages
self._stages[0].set_image(img_d)
for prev_stage, stage, height, width in zip(self._stages[:-1],
self._stages[1:],
self._heights[1:],
self._widths[1:]):
stage.set_image(prev_stage.resample_d(height, width))
def _set_mask(self, mask: np.ndarray) -> None:
"""Sets for each field the mask for weighting the images to process."""
for stage, height, width in zip(self._stages, self._heights, self._widths):
stage.set_mask(interp_nearest(mask, height, width))