Source code for crappy.tool.image_processing.video_extenso.video_extenso
# coding: utf-8
from multiprocessing import Pipe, current_process
from multiprocessing.connection import Connection
from multiprocessing.queues import Queue
from typing import Optional, Tuple, List, Union
import numpy as np
from itertools import combinations
from time import sleep, time
import logging
import logging.handlers
from select import select
from platform import system
from ...camera_config import SpotsBoxes, Box
from .tracker import Tracker, LostSpotError
[docs]
class VideoExtensoTool:
"""This class is the core of the :class:`~crappy.blocks.VideoExtenso` Block.
It performs spot tracking on up to `4` spots on the images acquired by the
:class:`~crappy.camera.Camera`, and computes the strain values at each new
image. For each spot, the tracking is performed by an independent
:class:`~crappy.tool.image_processing.video_extenso.tracker.tracker.Tracker`
Process.
It is possible to track only one spot, in which case only the position of its
center is returned and the strain values are left to `0`.
.. versionadded:: 1.4.0
.. versionchanged:: 1.5.10 renamed from *Video_extenso* to *VideoExtenso*
.. versionchanged:: 2.0.0 renamed from *VideoExtenso* to *VideoExtensoTool*
"""
[docs]
def __init__(self,
spots: SpotsBoxes,
thresh: int,
log_level: Optional[int],
log_queue: Queue,
white_spots: bool = False,
update_thresh: bool = False,
safe_mode: bool = False,
border: int = 5,
blur: Optional[int] = 5) -> None:
"""Sets the arguments and the other instance attributes.
Args:
spots: An instance of the
:class:`~crappy.tool.camera_config.config_tools.SpotsBoxes` tool
containing the coordinates of the spots to track.
.. versionadded:: 2.0.0
thresh: The grey level value of the threshold to use for discriminating
spots from the background, as an :obj:int`. Passed to the
:class:`~crappy.tool.image_processing.video_extenso.tracker.Tracker`
and not used in this class.
.. versionadded:: 2.0.0
log_level: The minimum logging level of the entire Crappy script, as an
:obj:`int`.
.. versionadded:: 2.0.0
log_queue: A :obj:`multiprocessing.Queue` for sending the log messages to
the main :obj:`~logging.Logger`, only used in Windows.
.. versionadded:: 2.0.0
white_spots: If :obj:`True`, detects white objects over a black
background, else black objects over a white background. Passed to the
:class:`~crappy.tool.image_processing.video_extenso.tracker.Tracker`
and not used in this class.
update_thresh: If :obj:`True`, the grey level threshold for detecting the
spots is re-calculated at each new image. Otherwise, the first
calculated threshold is kept for the entire test. The spots are less
likely to be lost with adaptive threshold, but the measurement will be
more noisy. Adaptive threshold may also yield inconsistent results when
spots are lost. Passed to the
:class:`~crappy.tool.image_processing.video_extenso.tracker.Tracker`
and not used in this class.
safe_mode: If :obj:`True`, the class will stop and raise an exception as
soon as overlapping spots are detected. Otherwise, it will first try to
reduce the detection window to get rid of overlapping. This argument
should be used when inconsistency in the results may have critical
consequences.
border: When searching for the new position of a spot, the class will
search in the last known bounding box of this spot plus a few
additional pixels in each direction. This argument sets the number of
additional pixels to use. It should be greater than the expected
"speed" of the spots, in pixels / frame. But if it's set too high,
noise or other spots might hinder the detection. Passed to the
:class:`~crappy.tool.image_processing.video_extenso.tracker.Tracker`
and not used in this class.
blur: The size in pixels (as an odd :obj:`int` greater than `1`) of the
kernel to use when applying a median blur filter to the image before
the spot detection. If not given, no blurring is performed. A slight
blur improves the spot detection by smoothening the noise, but also
takes a bit more time compared to no blurring. Passed to the
:class:`~crappy.tool.image_processing.video_extenso.tracker.Tracker`
and not used in this class.
.. versionremoved:: 2.0.0 *num_spots* and *min_area* arguments
"""
# These attributes will be used later
self._consecutive_overlaps = 0
self._trackers = list()
self._pipes = list()
# Setting the args
self._white_spots = white_spots
self._update_thresh = update_thresh
self._safe_mode = safe_mode
self._border = border
self._blur = blur
self.spots = spots
self._thresh = thresh
self._logger: Optional[logging.Logger] = None
self._log_level = log_level
self._log_queue = log_queue
self._last_warn = time()
self._system = system()
def __del__(self) -> None:
"""Security to ensure there are no zombie processes left when exiting."""
self.stop_tracking()
[docs]
def start_tracking(self) -> None:
"""Creates a
:class:`~crappy.tool.image_processing.video_extenso.tracker.Tracker`
Process for each detected spot, and starts it.
Also creates a :obj:`multiprocessing.Pipe` for each spot to communicate
with the Tracker process.
"""
if self.spots.empty():
raise AttributeError("No spots selected, aborting !")
for spot in self.spots:
if spot is None:
continue
inlet, outlet = Pipe()
tracker = Tracker(pipe=outlet,
logger_name=f"{current_process().name}."
f"{type(self).__name__}",
log_level=self._log_level,
log_queue=self._log_queue,
white_spots=self._white_spots,
thresh=None if self._update_thresh else self._thresh,
blur=self._blur)
self._pipes.append(inlet)
self._trackers.append(tracker)
tracker.start()
[docs]
def stop_tracking(self) -> None:
"""Stops all the active
:class:`~crappy.tool.image_processing.video_extenso.tracker.Tracker`
Processes, either gently or by terminating them if they don't stop by
themselves."""
if any((tracker.is_alive() for tracker in self._trackers)):
# First, gently asking the trackers to stop
for pipe, tracker in zip(self._pipes, self._trackers):
if tracker.is_alive():
pipe.send(('stop', 'stop', 'stop'))
sleep(0.1)
# If they're not stopping, killing the trackers
for tracker in self._trackers:
if tracker.is_alive():
self._log(logging.WARNING, "Tracker process did not stop properly, "
"terminating it")
tracker.terminate()
[docs]
def get_data(self,
img: np.ndarray
) -> Optional[Tuple[List[Tuple[float, ...]], float, float]]:
"""Takes an image as an input, performs spot detection on it, computes the
strain from the newly detected spots, and returns the spot positions and
strain values.
Args:
img: The image on which the spots should be detected.
Returns:
A :obj:`list` containing :obj:`tuple` with the coordinates of the centers
of the detected spots, and the calculated x and y strain values.
.. versionchanged:: 1.5.10 renamed from *get_def* to *get_data*
"""
# Sending the latest sub-image containing the spot to track
# Also sending the coordinates of the top left pixel
for pipe, spot in zip(self._pipes, self.spots):
x_top, x_bottom, y_left, y_right = spot.sorted()
slice_y = slice(max(0, y_left - self._border),
min(img.shape[0], y_right + self._border))
slice_x = slice(max(0, x_top - self._border),
min(img.shape[1], x_bottom + self._border))
pipe.send((slice_y.start, slice_x.start, img[slice_y, slice_x]))
for i, (pipe, spot) in enumerate(zip(self._pipes, self.spots)):
# Receiving the data from the tracker, if there's any
if pipe.poll(timeout=0.1):
box = pipe.recv()
# In case a tracker faced an error, stopping them all and raising
if isinstance(box, str):
self.stop_tracking()
self._log(logging.ERROR, "Tracker process returned exception !")
raise LostSpotError
self.spots[i] = box
overlap = False
# Checking if the newly received boxes overlaps with each other
for box_1, box_2 in combinations(self.spots, 2):
if box_1 is None or box_2 is None:
continue
if self._overlap_box(box_1, box_2):
# If there's overlapping in safe mode, raising directly
if self._safe_mode:
self.stop_tracking()
self._log(logging.ERROR, "Overlapping detected in safe mode, "
"raising exception")
raise LostSpotError
self._log(logging.WARNING, "Overlapping detected ! Reducing spot "
"window")
# If we're not in safe mode, simply reduce the boxes by 1 pixel
# Also, make sure the box is not being reduced too much
overlap = True
x_top_1, x_bottom_1, y_left_1, y_right_1 = box_1.sorted()
x_top_2, x_bottom_2, y_left_2, y_right_2 = box_2.sorted()
box_1.x_start = min(x_top_1 + 1, box_1.x_centroid - 2)
box_1.y_start = min(y_left_1 + 1, box_1.y_centroid - 2)
box_1.x_end = max(x_bottom_1 - 1, box_1.x_centroid + 2)
box_1.y_end = max(y_right_1 - 1, box_1.y_centroid + 2)
box_2.x_start = min(x_top_2 + 1, box_2.x_centroid - 2)
box_2.y_start = min(y_left_2 + 1, box_2.y_centroid - 2)
box_2.x_end = max(x_bottom_2 - 1, box_2.x_centroid + 2)
box_2.y_end = max(y_right_2 - 1, box_2.y_centroid + 2)
if overlap:
self._consecutive_overlaps += 1
if self._consecutive_overlaps > 10:
self._log(logging.ERROR, "Too many consecutive overlaps !")
raise LostSpotError
else:
self._consecutive_overlaps = 0
# If there are multiple spots, the x and y strains can be computed
if len(self.spots) > 1:
x = [spot.x_centroid for spot in self.spots if spot is not None]
y = [spot.y_centroid for spot in self.spots if spot is not None]
# The strain is calculated based on the positions of the extreme
# spots in each direction
try:
exx = ((max(x) - min(x)) / self.spots.x_l0 - 1) * 100
except ZeroDivisionError:
exx = 0
try:
eyy = ((max(y) - min(y)) / self.spots.y_l0 - 1) * 100
except ZeroDivisionError:
eyy = 0
centers = list(zip(y, x))
return centers, eyy, exx
# If only one spot was detected, the strain isn't computed
else:
x = self.spots[0].x_centroid
y = self.spots[0].y_centroid
return [(y, x)], 0, 0
def _log(self, level: int, msg: str) -> None:
"""Wrapper for recording log messages.
Also instantiates the :obj:`~logging.Logger` on the first message.
Args:
level: The logging level of the message, as an :obj:`int`.
msg: The message to lof, as a :obj:`str`.
"""
if self._logger is None:
self._logger = logging.getLogger(
f"{current_process().name}.{type(self).__name__}")
self._logger.log(level, msg)
def _send(self,
conn: Connection,
val: Union[str, Tuple[int, int, np.ndarray]]) -> None:
"""Wrapper for sending messages to the Tracker processes.
In Linux, checks that the Pipe is not full before sending the message.
Args:
conn: The Connection to use for sending the message.
val: The message to send to the Tracker process.
"""
if self._system == 'Linux':
if select([], [conn], [], 0)[1]:
conn.send(val)
elif time() - self._last_warn > 1:
self._last_warn = time()
self._log(logging.WARNING, f"Cannot send the image to process to the"
f" Tracker process, the Pipe is full !")
else:
conn.send(val)
@staticmethod
def _overlap_box(box_1: Box, box_2: Box) -> bool:
"""Determines whether two boxes are overlapping or not."""
x_min_1, x_max_1, y_min_1, y_max_1 = box_1.sorted()
x_min_2, x_max_2, y_min_2, y_max_2 = box_2.sorted()
return max((min(x_max_1, x_max_2) - max(x_min_1, x_min_2)), 0) * max(
(min(y_max_1, y_max_2) - max(y_min_1, y_min_2)), 0) > 0
@staticmethod
def _overlap_bbox(prop_1, prop_2) -> bool:
"""Determines whether two bboxes are overlapping or not."""
y_min_1, x_min_1, y_max_1, x_max_1 = prop_1.bbox
y_min_2, x_min_2, y_max_2, x_max_2 = prop_2.bbox
return max((min(x_max_1, x_max_2) - max(x_min_1, x_min_2)), 0) * max(
(min(y_max_1, y_max_2) - max(y_min_1, y_min_2)), 0) > 0