Source code for crappy.tool.image_processing.video_extenso.tracker

# coding: utf-8

from multiprocessing import Process, get_start_method
from multiprocessing.connection import Connection
from multiprocessing.queues import Queue
import numpy as np
from typing import Optional, Union
from time import time
from select import select
import logging
import logging.handlers
from platform import system

from ...camera_config import Box
from ...._global import OptionalModule

try:
  import cv2
except (ModuleNotFoundError, ImportError):
  cv2 = OptionalModule("opencv-python")
try:
  from skimage.filters import threshold_otsu
except (ModuleNotFoundError, ImportError):
  threshold_otsu = OptionalModule("skimage", "Please install scikit-image to "
                                             "use Video-extenso")


[docs] class LostSpotError(Exception): """Exception raised when a spot is lost, or when there's too much overlapping."""
[docs] class Tracker(Process): """:obj:`multiprocessing.Process` whose task is to track a spot on an image. It receives a subframe centered on the last known position of the spot, and returns the updated position of the detected spot. It is meant to be used in association with the :class:`~crappy.tool.image_processing.video_extenso.VideoExtensoTool`. .. versionadded:: 2.0.0 """ names = list()
[docs] def __init__(self, pipe: Connection, logger_name: str, log_level: Optional[int], log_queue: Queue, white_spots: bool = False, thresh: Optional[int] = None, blur: Optional[float] = 5) -> None: """Sets the arguments. Args: pipe: The :obj:`~multiprocessing.connection.Connection` object through which the image is received and the updated coordinates of the spot are sent back. logger_name: The name of the parent :obj:`~logging.Logger` as a :obj:`str`, used for naming the Logger in this class. log_level: The minimum logging level of the entire Crappy script, as an :obj:`int`. log_queue: A :obj:`multiprocessing.Queue` for sending the log messages to the main :obj:`~logging.Logger`, only used in Windows. white_spots: If :obj:`True`, detects white objects on a black background, else black objects on a white background. thresh: If given, this threshold value will always be used for isolating the spot from the background. If not given (:obj:`None`), a new threshold is recalculated for each new subframe. Spots are less likely to be lost with an adaptive threshold, but it takes a bit more time. blur: If not :obj:`None`, the subframe is first blurred before trying to detect the spot. This argument gives the size of the kernel to use for blurring. Better results are obtained with blurring, but it takes a bit more time. """ super().__init__() self.name = self.get_name(logger_name, type(self).__name__) self._system = system() self._pipe = pipe self._white_spots = white_spots self._thresh = thresh self._blur = blur self._logger: Optional[logging.Logger] = None self._log_level = log_level self._log_queue = log_queue self._n = 0 self._last_warn = time()
@classmethod def get_name(cls, logger_name: str, self_name: str) -> str: """Method for naming the Tracker processes so that each of them has a unique name. Args: logger_name: The name of the parent :obj:`~logging.Logger`, as a :obj:`str`. self_name: The name of the current class. Returns: The chosen name for the current Tracker process, as a :obj:`str`. """ i = 1 while f"{logger_name}.{self_name}-{i}" in cls.names: i += 1 cls.names.append(f"{logger_name}.{self_name}-{i}") return f"{logger_name}.{self_name}-{i}"
[docs] def run(self) -> None: """Continuously reads incoming subframes, tries to detect a spot and sends back the coordinates of the detected spot. Can only be stopped either with a :exc:`KeyboardInterrupt` or when receiving a text message from the :class:`~crappy.tool.image_processing.video_extenso.VideoExtensoTool`. """ # Looping forever for receiving data try: self._set_logger() while True: # Making sure the call to recv is not blocking if self._pipe.poll(0.5): y_start, x_start, img = self._pipe.recv() self._log(logging.DEBUG, "Received data from pipe") self._n += 1 # If a string is received, always means the process has to stop if isinstance(img, str): break # Simply sending back the new Box containing the spot try: self._log(logging.DEBUG, "Sending back data through pipe") self._send(self._evaluate(x_start, y_start, img)) # If the caught exception is a KeyboardInterrupt, simply stopping except KeyboardInterrupt: self._log(logging.INFO, "Caught KeyboardInterrupt, stopping the " "process") break # Sending back the exception if anything else unexpected happened except (Exception,) as exc: self._logger.exception("Caught exception while tracking spot", exc_info=exc) self._send('stop') break # In case the user presses CTRL+C, simply stopping the process except KeyboardInterrupt: self._log(logging.INFO, "Caught KeyboardInterrupt, stopping the process")
def _evaluate(self, x_start: int, y_start: int, img: np.ndarray) -> Box: """Takes a sub-image, applies a threshold on it and tries to detect the new position of the spot. Args: x_start: The x position of the top left pixel of the subframe on the entire image. y_start: The y position of the top left pixel of the subframe on the entire image. img: The subframe on which to search for a spot. Returns: A :class:`~crappy.tool.camera_config.config_tools.Box` object containing the x and y start and end positions of the detected spot, as well as the coordinates of the centroid. """ # First, blurring the image if asked to if self._blur is not None and self._blur > 1: img = cv2.medianBlur(img, self._blur) # Determining the best threshold for the image if required thresh = self._thresh if self._thresh is not None else threshold_otsu(img) # Getting all pixels superior or inferior to threshold if self._white_spots: black_white = (img > thresh).astype('uint8') else: black_white = (img <= thresh).astype('uint8') # Checking that the detected spot is large enough if np.count_nonzero(black_white) < 0.1 * img.size: # If the threshold is pre-defined, trying again with an updated one if self._thresh is not None: self._log(logging.WARNING, "Detected spot too small compared with overall box size, " "recalculating threshold") thresh = threshold_otsu(img) if self._white_spots: black_white = (img > thresh).astype('uint8') else: black_white = (img <= thresh).astype('uint8') # If the spot still cannot be detected, aborting if np.count_nonzero(black_white) < 0.1 * img.size: self._log(logging.ERROR, "Couldn't detect spot with adaptive threshold, aborting !") raise LostSpotError # If an adaptive threshold is already used, nothing more can be done else: self._log(logging.ERROR, "Couldn't detect spot with adaptive threshold, aborting !") raise LostSpotError # Calculating the coordinates of the centroid using the image moments moments = cv2.moments(black_white) try: x = moments['m10'] / moments['m00'] y = moments['m01'] / moments['m00'] except ZeroDivisionError: raise ZeroDivisionError("Couldn't compute the centroid because the " "moment of order 0, 0 is zero !") # Getting the updated centroid and coordinates of the spot x_min, y_min, width, height = cv2.boundingRect(black_white) return Box(x_start=x_start + x_min, y_start=y_start + y_min, x_end=x_start + x_min + width, y_end=y_start + y_min + height, x_centroid=x_start + x, y_centroid=y_start + y) def _send(self, val: Union[Box, str]) -> None: """Sends a message to the :class:`~crappy.tool.image_processing.video_extenso.VideoExtensoTool`, and in Linux checks that the :obj:`multiprocessing.Pipe` is not full before sending.""" if self._system == 'Linux': if select([], [self._pipe], [], 0)[1]: self._pipe.send(val) elif time() - self._last_warn > 1: self._last_warn = time() self._log(logging.WARNING, f"Cannot send the detected spot to the " f"VideoExtenso tool, the Pipe is full !") else: self._pipe.send(val) def _set_logger(self) -> None: """Instantiates and sets up the logger for the instance.""" logger = logging.getLogger(self.name) # Disabling logging if requested if self._log_level is not None: logger.setLevel(self._log_level) else: logging.disable() # On Windows, the messages need to be sent through a Queue for logging if get_start_method() == "spawn" and self._log_level is not None: queue_handler = logging.handlers.QueueHandler(self._log_queue) queue_handler.setLevel(self._log_level) logger.addHandler(queue_handler) self._logger = logger def _log(self, level: int, msg: str) -> None: """Sends a log message to the :obj:`~logging.Logger`. Args: level: The logging level, as an :obj:`int`. msg: The message to log, as a :obj:`str`. """ if self._logger is None: return self._logger.log(level, msg)