# coding: utf-8
from threading import Thread
from math import log2, ceil
import numpy as np
from typing import Optional
from collections.abc import Iterable
from time import time, sleep
import logging
import logging.handlers
from .camera_process import CameraProcess
from ..._global import OptionalModule
from ...tool.camera_config import Overlay
plt = OptionalModule('matplotlib.pyplot', lazy_import=True)
try:
import cv2
except (ModuleNotFoundError, ImportError):
cv2 = OptionalModule("opencv-python")
[docs]
class Displayer(CameraProcess):
"""This :class:`~crappy.blocks.camera_processes.CameraProcess` can display
images acquired by a :class:`~crappy.blocks.Camera` Block in a dedicated
window.
It is meant to serve as a control or validation feature, its resolution is
thus limited to `640x480` and it should not be used at high framerates. On
top of the displayed image, it can also draw
:class:`~crappy.tool.camera_config.config_tools.Overlay` objects sent by
other :class:`~crappy.blocks.camera_processes.CameraProcess`. This way, the
user can for example visualize the spots being tracked by the
:class:`~crappy.blocks.VideoExtenso` Block in real time.
The images can be displayed using two different backends : either using
:mod:`cv2` (OpenCV), or using :mod:`matplotlib`. OpenCV is by far the fastest
and most convenient.
.. versionadded:: 2.0.0
"""
[docs]
def __init__(self,
title: str,
framerate: float,
backend: Optional[str] = None) -> None:
"""Sets the arguments and initializes the parent class.
Args:
title: The name of the Displayer window, that will be displayed on the
window border.
framerate: The target framerate for the display. The actual achieved
framerate might be lower, but never greater than this value.
backend: The module to use for displaying the images. Can be either
``'cv2'`` or ``'mpl'``, to use respectively :mod:`cv2` or
:mod:`matplotlib`.
"""
# The thread must be initialized later for compatibility with Windows
self._overlay_thread: Optional[Thread] = None
self._overlay: Iterable[Overlay] = list()
self._stop_thread = False
super().__init__()
self._title = title
self._framerate = framerate
# Selecting the backend if no backend was specified
if backend is None:
if not isinstance(cv2, OptionalModule):
self._backend = 'cv2'
else:
try:
_ = plt.Figure
self._backend = 'mpl'
except RuntimeError:
raise ModuleNotFoundError("Neither opencv-python nor matplotlib "
"could be imported, no backend found for "
"displaying the images")
elif backend in ('cv2', 'mpl'):
self._backend = backend
else:
raise ValueError("The backend argument should be either 'cv2' or "
"'mpl' !")
# Setting other instance attributes
self._ax = None
self._fig = None
self._last_upd = time()
[docs]
def __del__(self) -> None:
"""On exit, ensuring that the :obj:`~threading.Thread` in charge of
grabbing the :class:`~crappy.tool.camera_config.config_tools.Overlay` to
display has stopped, otherwise stopping it."""
if self._overlay_thread is not None and self._overlay_thread.is_alive():
self._stop_thread = True
try:
self._overlay_thread.join(0.05)
except RuntimeError:
pass
[docs]
def init(self) -> None:
"""Starts the :obj:`~threading.Thread` for grabbing the
:class:`~crappy.tool.camera_config.config_tools.Overlay` to display, and
initializes the Displayer window."""
# Instantiating and starting the Thread for grabbing the Overlays
self.log(logging.INFO, "Instantiating the thread for getting the Overlays"
" to display")
self._overlay_thread = Thread(target=self._thread_target)
self.log(logging.INFO, "Starting the thread for getting the Overlays to "
"display")
self._overlay_thread.start()
# Preparing the Displayer window
self.log(logging.INFO, f"Opening the displayer window with the backend "
f"{self._backend}")
if self._backend == 'cv2':
self._prepare_cv2()
elif self._backend == 'mpl':
self._prepare_mpl()
def _get_data(self) -> bool:
"""Method similar to the one of the parent class, except it also ensures
that the achieved framerate stays within the limit specified by the user.
Returns:
:obj:`True` in case a frame was acquired and needs to be handled, or
:obj:`False` if no frame was grabbed and nothing should be done.
"""
# Acquiring the Lock to avoid conflicts with other CameraProcesses
with self._lock:
# In case there's no frame grabbed yet
if 'ImageUniqueID' not in self._data_dict:
return False
# In case the frame in buffer was already handled during a previous loop,
# or it's too early to grab a new frame because of the target framerate
if self._data_dict['ImageUniqueID'] == self.metadata['ImageUniqueID'] \
or time() - self._last_upd < 1 / self._framerate:
return False
# Copying the metadata
self.metadata = self._data_dict.copy()
self._last_upd = time()
self.log(logging.DEBUG, f"Got new image to process with id "
f"{self.metadata['ImageUniqueID']}")
# Copying the frame
np.copyto(self.img,
np.frombuffer(self._img_array.get_obj(),
dtype=self._dtype).reshape(self._shape))
return True
[docs]
def loop(self) -> None:
"""This method grabs the latest frame, casts it to 8 bits if necessary,
and updates the Displayer window to draw it.
It also draws the latest received
:class:`~crappy.tool.camera_config.config_tools.Overlay` on top of the
displayed frame.
"""
# Casting the image to uint8 if it's not already in this format
if self.img.dtype != np.uint8:
self.log(logging.DEBUG, f"Casting displayed image from "
f"{self.img.dtype} to uint8")
if max_pix := int(np.max(self.img)) > 255:
factor = max(ceil(log2(max_pix + 1) - 8), 0)
img = (self.img / 2 ** factor).astype(np.uint8)
else:
img = self.img.astype(np.uint8)
else:
img = self.img.copy()
# Drawing the latest known overlay
for overlay in self._overlay:
if overlay is not None:
self.log(logging.DEBUG, f"Drawing {overlay} on top of the image to "
"display")
overlay.draw(img)
# Calling the right update method
if self._backend == 'cv2':
self._update_cv2(img)
elif self._backend == 'mpl':
self._update_mpl(img)
[docs]
def finish(self) -> None:
"""Closes the Displayer window and stops the :obj:`~threading.Thread`
grabbing the :class:`~crappy.tool.camera_config.config_tools.Overlay`"""
# Closing the Displayer window
self.log(logging.INFO, "Closing the displayer window")
if self._backend == 'cv2':
self._finish_cv2()
elif self._backend == 'mpl':
self._finish_mpl()
# Stooping the Thread grabbing the Overlay to draw
if self._overlay_thread is not None and self._overlay_thread.is_alive():
self._stop_thread = True
try:
self._overlay_thread.join(0.05)
except RuntimeError:
self.log(logging.WARNING, "Thread for receiving the Overlay did not "
"stop as expected")
def _thread_target(self) -> None:
"""This method is the target to the :obj:`~threading.Thread` in charge of
grabbing the :class:`~crappy.tool.camera_config.config_tools.Overlay` to
draw on top of the displayed image.
It repeatedly polls the :obj:`~multiprocessing.Connection` through which
the Overlays are received, and stores the last received Overlays.
"""
# Looping until the entire CameraProcess is told to stop, or the
# _stop_thread flag is raised
while not self._stop_event.is_set() and not self._stop_thread:
# Receiving the latest Overlay to draw
overlay = None
while self._to_draw_conn.poll():
overlay = self._to_draw_conn.recv()
# Saving the received Overlay
if overlay is not None:
self.log(logging.DEBUG, f"Received overlay to display: {overlay}")
self._overlay = overlay
# To avoid spamming the CPU in vain
else:
sleep(0.001)
self.log(logging.INFO, "Thread for receiving the Overlays ended")
def _prepare_cv2(self) -> None:
"""Instantiates the display window of :mod:`cv2`."""
try:
flags = cv2.WINDOW_NORMAL | cv2.WINDOW_KEEPRATIO
except AttributeError:
flags = cv2.WINDOW_NORMAL
cv2.namedWindow(self._title, flags)
def _prepare_mpl(self) -> None:
"""Creates a :mod:`matplotlib` Figure."""
plt.ion()
self._fig, self._ax = plt.subplots()
def _update_cv2(self, img: np.ndarray) -> None:
"""Reshapes the image to a maximum shape of 640x480 and displays it in
:mod:`cv2`."""
if img.shape[0] > 480 or img.shape[1] > 640:
factor = min(480 / img.shape[0], 640 / img.shape[1])
self.log(logging.DEBUG,
f"Reshaping displayed image from {img.shape} to "
f"{int(img.shape[1] * factor), int(img.shape[0] * factor)}")
img = cv2.resize(img, (int(img.shape[1] * factor),
int(img.shape[0] * factor)))
self.log(logging.DEBUG, "Displaying the image")
cv2.imshow(self._title, img)
cv2.waitKey(1)
def _update_mpl(self, img: np.ndarray) -> None:
"""Reshapes the image to a dimension inferior or equal to 640x480 and
displays it in :mod:`matplotlib`."""
if img.shape[0] > 480 or img.shape[1] > 640:
factor = max(ceil(img.shape[0] / 480), ceil(img.shape[1] / 640))
self.log(logging.DEBUG,
f"Reshaping the displayed image from {img.shape} to "
f"{(img.shape[0] / factor, img.shape[1] / factor)}")
img = img[::factor, ::factor]
self._ax.clear()
self.log(logging.DEBUG, "Displaying the image")
self._ax.imshow(img, cmap='gray')
plt.pause(0.001)
plt.show()
def _finish_cv2(self) -> None:
"""Destroys the opened :mod:`cv2` window."""
if self._title is not None:
cv2.destroyWindow(self._title)
def _finish_mpl(self) -> None:
"""Destroys the opened :mod:`matplotlib` window."""
if self._fig is not None:
plt.close(self._fig)