Source code for crappy.blocks.camera_processes.display

# coding: utf-8

from threading import Thread
from math import log2, ceil
import numpy as np
from typing import Optional, Iterable
from time import time, sleep
import logging
import logging.handlers

from .camera_process import CameraProcess
from ..._global import OptionalModule
from ...tool.camera_config import Overlay

plt = OptionalModule('matplotlib.pyplot', lazy_import=True)

try:
  import cv2
except (ModuleNotFoundError, ImportError):
  cv2 = OptionalModule("opencv-python")


[docs] class Displayer(CameraProcess): """This :class:`~crappy.blocks.camera_processes.CameraProcess` can display images acquired by a :class:`~crappy.blocks.Camera` Block in a dedicated window. It is meant to serve as a control or validation feature, its resolution is thus limited to `640x480` and it should not be used at high framerates. On top of the displayed image, it can also draw :class:`~crappy.tool.camera_config.config_tools.Overlay` objects sent by other :class:`~crappy.blocks.camera_processes.CameraProcess`. This way, the user can for example visualize the spots being tracked by the :class:`~crappy.blocks.VideoExtenso` Block in real time. The images can be displayed using two different backends : either using :mod:`cv2` (OpenCV), or using :mod:`matplotlib`. OpenCV is by far the fastest and most convenient. .. versionadded:: 2.0.0 """
[docs] def __init__(self, title: str, framerate: float, backend: Optional[str] = None) -> None: """Sets the arguments and initializes the parent class. Args: title: The name of the Displayer window, that will be displayed on the window border. framerate: The target framerate for the display. The actual achieved framerate might be lower, but never greater than this value. backend: The module to use for displaying the images. Can be either ``'cv2'`` or ``'mpl'``, to use respectively :mod:`cv2` or :mod:`matplotlib`. """ # The thread must be initialized later for compatibility with Windows self._overlay_thread: Optional[Thread] = None self._overlay: Iterable[Overlay] = list() self._stop_thread = False super().__init__() self._title = title self._framerate = framerate # Selecting the backend if no backend was specified if backend is None: if not isinstance(cv2, OptionalModule): self._backend = 'cv2' else: try: _ = plt.Figure self._backend = 'mpl' except RuntimeError: raise ModuleNotFoundError("Neither opencv-python nor matplotlib " "could be imported, no backend found for " "displaying the images") elif backend in ('cv2', 'mpl'): self._backend = backend else: raise ValueError("The backend argument should be either 'cv2' or " "'mpl' !") # Setting other instance attributes self._ax = None self._fig = None self._last_upd = time()
[docs] def __del__(self) -> None: """On exit, ensuring that the :obj:`~threading.Thread` in charge of grabbing the :class:`~crappy.tool.camera_config.config_tools.Overlay` to display has stopped, otherwise stopping it.""" if self._overlay_thread is not None and self._overlay_thread.is_alive(): self._stop_thread = True try: self._overlay_thread.join(0.05) except RuntimeError: pass
[docs] def init(self) -> None: """Starts the :obj:`~threading.Thread` for grabbing the :class:`~crappy.tool.camera_config.config_tools.Overlay` to display, and initializes the Displayer window.""" # Instantiating and starting the Thread for grabbing the Overlays self.log(logging.INFO, "Instantiating the thread for getting the Overlays" " to display") self._overlay_thread = Thread(target=self._thread_target) self.log(logging.INFO, "Starting the thread for getting the Overlays to " "display") self._overlay_thread.start() # Preparing the Displayer window self.log(logging.INFO, f"Opening the displayer window with the backend " f"{self._backend}") if self._backend == 'cv2': self._prepare_cv2() elif self._backend == 'mpl': self._prepare_mpl()
def _get_data(self) -> bool: """Method similar to the one of the parent class, except it also ensures that the achieved framerate stays within the limit specified by the user. Returns: :obj:`True` in case a frame was acquired and needs to be handled, or :obj:`False` if no frame was grabbed and nothing should be done. """ # Acquiring the Lock to avoid conflicts with other CameraProcesses with self._lock: # In case there's no frame grabbed yet if 'ImageUniqueID' not in self._data_dict: return False # In case the frame in buffer was already handled during a previous loop, # or it's too early to grab a new frame because of the target framerate if self._data_dict['ImageUniqueID'] == self.metadata['ImageUniqueID'] \ or time() - self._last_upd < 1 / self._framerate: return False # Copying the metadata self.metadata = self._data_dict.copy() self._last_upd = time() self.log(logging.DEBUG, f"Got new image to process with id " f"{self.metadata['ImageUniqueID']}") # Copying the frame np.copyto(self.img, np.frombuffer(self._img_array.get_obj(), dtype=self._dtype).reshape(self._shape)) return True
[docs] def loop(self) -> None: """This method grabs the latest frame, casts it to 8 bits if necessary, and updates the Displayer window to draw it. It also draws the latest received :class:`~crappy.tool.camera_config.config_tools.Overlay` on top of the displayed frame. """ # Casting the image to uint8 if it's not already in this format if self.img.dtype != np.uint8: self.log(logging.DEBUG, f"Casting displayed image from " f"{self.img.dtype} to uint8") if np.max(self.img) > 255: factor = max(ceil(log2(np.max(self.img) + 1) - 8), 0) img = (self.img / 2 ** factor).astype(np.uint8) else: img = self.img.astype(np.uint8) else: img = self.img.copy() # Drawing the latest known overlay for overlay in self._overlay: if overlay is not None: self.log(logging.DEBUG, f"Drawing {overlay} on top of the image to " "display") overlay.draw(img) # Calling the right update method if self._backend == 'cv2': self._update_cv2(img) elif self._backend == 'mpl': self._update_mpl(img)
[docs] def finish(self) -> None: """Closes the Displayer window and stops the :obj:`~threading.Thread` grabbing the :class:`~crappy.tool.camera_config.config_tools.Overlay`""" # Closing the Displayer window self.log(logging.INFO, "Closing the displayer window") if self._backend == 'cv2': self._finish_cv2() elif self._backend == 'mpl': self._finish_mpl() # Stooping the Thread grabbing the Overlay to draw if self._overlay_thread is not None and self._overlay_thread.is_alive(): self._stop_thread = True try: self._overlay_thread.join(0.05) except RuntimeError: self.log(logging.WARNING, "Thread for receiving the Overlay did not " "stop as expected")
def _thread_target(self) -> None: """This method is the target to the :obj:`~threading.Thread` in charge of grabbing the :class:`~crappy.tool.camera_config.config_tools.Overlay` to draw on top of the displayed image. It repeatedly polls the :obj:`~multiprocessing.Connection` through which the Overlays are received, and stores the last received Overlays. """ # Looping until the entire CameraProcess is told to stop, or the # _stop_thread flag is raised while not self._stop_event.is_set() and not self._stop_thread: # Receiving the latest Overlay to draw overlay = None while self._to_draw_conn.poll(): overlay = self._to_draw_conn.recv() # Saving the received Overlay if overlay is not None: self.log(logging.DEBUG, f"Received overlay to display: {overlay}") self._overlay = overlay # To avoid spamming the CPU in vain else: sleep(0.001) self.log(logging.INFO, "Thread for receiving the Overlays ended") def _prepare_cv2(self) -> None: """Instantiates the display window of :mod:`cv2`.""" try: flags = cv2.WINDOW_NORMAL | cv2.WINDOW_KEEPRATIO except AttributeError: flags = cv2.WINDOW_NORMAL cv2.namedWindow(self._title, flags) def _prepare_mpl(self) -> None: """Creates a :mod:`matplotlib` Figure.""" plt.ion() self._fig, self._ax = plt.subplots() def _update_cv2(self, img: np.ndarray) -> None: """Reshapes the image to a maximum shape of 640x480 and displays it in :mod:`cv2`.""" if img.shape[0] > 480 or img.shape[1] > 640: factor = min(480 / img.shape[0], 640 / img.shape[1]) self.log(logging.DEBUG, f"Reshaping displayed image from {img.shape} to " f"{int(img.shape[1] * factor), int(img.shape[0] * factor)}") img = cv2.resize(img, (int(img.shape[1] * factor), int(img.shape[0] * factor))) self.log(logging.DEBUG, "Displaying the image") cv2.imshow(self._title, img) cv2.waitKey(1) def _update_mpl(self, img: np.ndarray) -> None: """Reshapes the image to a dimension inferior or equal to 640x480 and displays it in :mod:`matplotlib`.""" if img.shape[0] > 480 or img.shape[1] > 640: factor = max(ceil(img.shape[0] / 480), ceil(img.shape[1] / 640)) self.log(logging.DEBUG, f"Reshaping the displayed image from {img.shape} to " f"{(img.shape[0] / factor, img.shape[1] / factor)}") img = img[::factor, ::factor] self._ax.clear() self.log(logging.DEBUG, "Displaying the image") self._ax.imshow(img, cmap='gray') plt.pause(0.001) plt.show() def _finish_cv2(self) -> None: """Destroys the opened :mod:`cv2` window.""" if self._title is not None: cv2.destroyWindow(self._title) def _finish_mpl(self) -> None: """Destroys the opened :mod:`matplotlib` window.""" if self._fig is not None: plt.close(self._fig)