Source code for crappy.blocks.camera_processes.camera_process

# coding: utf-8

from multiprocessing import Process, managers, get_start_method, \
  current_process
from multiprocessing.synchronize import Event, RLock, Barrier
from multiprocessing.sharedctypes import SynchronizedArray
from multiprocessing.connection import Connection
from multiprocessing.queues import Queue
from threading import BrokenBarrierError
import numpy as np
from typing import Optional, Tuple, List, Union, Dict, Any, Iterable
import logging
import logging.handlers
from select import select
from time import time
from platform import system

from ...links import Link
from ..._global import LinkDataError
from ...tool.camera_config import Overlay


[docs] class CameraProcess(Process): """This :obj:`~multiprocessing.Process` is the base class for all the helper Processes of the :class:`~crappy.blocks.Camera` Block. It defines a base architecture that all the children classes use, and on top of which they add the specific actions they perform. This class is not meant to be instantiated as is, it should always be subclassed. This class and its children can be seen as a tool for the :class:`~crappy.blocks.Camera` Block and its children. Several instances can be started by the Camera Block, each instance performing a different action with the acquired images. This allows to parallelize image acquisition, display, processing and recording on multiple CPU cores, to increase the achieved FPS. The Camera Block performs the acquisition, and makes the latest captured image available to all the CameraProcess :obj:`~multiprocessing.Process` through an :obj:`~multiprocessing.Array`. They are then free to run at their own rhythm, but are sure to always grab the latest available frame. The instantiation, startup and termination of the CameraProcesses is all managed by the parent :class:`~crappy.blocks.Camera` Block, depending on the provided arguments. Users should normally not need to call this class themselves. .. versionadded:: 2.0.0 """
[docs] def __init__(self) -> None: """Initializes the parent class and all the instance attributes.""" super().__init__() self.name = f"{current_process().name}.{type(self).__name__}" self._system = system() # Logging-related objects self._log_queue: Optional[Queue] = None self._logger: Optional[logging.Logger] = None self._log_level: Optional[int] = None # These objects will be shared later by the Camera Block self._img_array: Optional[SynchronizedArray] = None self._data_dict: Optional[managers.DictProxy] = None self._lock: Optional[RLock] = None self._cam_barrier: Optional[Barrier] = None self._stop_event: Optional[Event] = None self._shape: Optional[Tuple[int, int]] = None self._to_draw_conn: Optional[Connection] = None self._outputs: List[Link] = list() self._labels: List[str] = list() self.img: Optional[np.ndarray] = None self._dtype = None self.metadata = {'ImageUniqueID': None} self._img0_set = False # Other attribute for internal use self._last_warn = time() self.fps_count = 0 self._display_freq: Optional[bool] = None self._last_fps = time()
[docs] def set_shared(self, array: SynchronizedArray, data_dict: managers.DictProxy, lock: RLock, barrier: Barrier, event: Event, shape: Union[Tuple[int, int], Tuple[int, int, int]], dtype, to_draw_conn: Optional[Connection], outputs: List[Link], labels: Optional[List[str]], log_queue: Queue, log_level: Optional[int] = 20, display_freq: bool = False) -> None: """Method allowing the :class:`~crappy.blocks.Camera` Block to share :mod:`multiprocessing` synchronization objects with this class. Args: array: The :obj:`~multiprocessing.Array` containing the last frame acquired by the Camera Block. data_dict: A :obj:`dict` managed by a :obj:`~multiprocessing.Manager` and containing the metadata of the last acquired frame. lock: A :obj:`~multiprocessing.RLock` ensuring that the CameraProcess and the Camera Block do not try to access the shared array at the same time. barrier: A :obj:`~multiprocessing.Barrier` ensuring that all the CameraProcesses wait for a start signal from the Camera Block before starting to run. event: A :obj:`~multiprocessing.Event` indicating to the CameraProcess when to stop running. It is either set by the Camera Block, or by a CameraProcess. shape: The expected shape of the image, as a :obj:`tuple`. It is necessary as the frames are shared as a one-dimensional array. dtype: The expected dtype of the image. It is necessary for reconstructing the image from the one-dimensional shared array. to_draw_conn: A :obj:`~multiprocessing.Connection` for sending or receiving :class:`~crappy.tool.camera_config.config_tools.Overlay` objects to draw on top of the displayed image. outputs: The :class:`~crappy.links.Link` objects for sending data to downstream Blocks. They are the same as those owned by the Camera Block. labels: The labels to use when sending data to downstream Blocks. log_queue: A :obj:`~multiprocessing.Queue` for sending the log messages to the main :obj:`~logging.Logger`, only used in Windows. log_level: The minimum logging level of the entire Crappy script, as an :obj:`int`. display_freq: If :obj:`True`, the looping frequency of this class will be displayed while running. """ self._img_array = array self._data_dict = data_dict self._lock = lock self._cam_barrier = barrier self._stop_event = event self._shape = shape self._dtype = dtype self._to_draw_conn = to_draw_conn self._outputs = outputs self._labels = labels # Logging related attributes self._log_queue = log_queue self._log_level = log_level self._display_freq = display_freq self.img = np.empty(shape=shape, dtype=dtype)
[docs] def run(self) -> None: """This method is the core of the :obj:`~multiprocessing.Process`. It starts by initializing the :obj:`~logging.Logger`, and then performs any additional action required before processing images. Once all the CameraProcesses are ready, it loops forever and processes images until told to stop or an exception is raised. And finally, it performs any action required for properly exiting. This method is quite similar to the :meth:`~crappy.blocks.Block.run` method of the :class:`~crappy.blocks.Block`, although it is much simpler. """ try: # First thing, setting the Logger self._set_logger() self.log(logging.INFO, "Logger configured") # Initializing the CameraProcess, and breaking the Barrier to warn the # other CameraProcesses in case something goes wrong try: self.init() except (Exception,): self._cam_barrier.abort() self.log(logging.ERROR, "Breaking the barrier due to caught exception" " while preparing") raise # Waiting for all other CameraProcess to be ready self.log(logging.INFO, "Waiting for the other Camera processes to be " "ready") self._cam_barrier.wait() self.log(logging.INFO, "All Camera processes ready now") self._last_fps = time() # Looping forever until told to stop or an exception is raised while not self._stop_event.is_set(): # Only looping if a new image is available if self._get_data(): self.log(logging.DEBUG, "Running the loop method") self.loop() self.fps_count += 1 # Displaying the looping frequency is required if self._display_freq: t = time() if t - self._last_fps > 2: self.log(logging.INFO, f"Images processed /s: " f"{self.fps_count / (t - self._last_fps)}") self._last_fps = t self.fps_count = 0 self.log(logging.INFO, "Stop event set, stopping the processing") # Case when CTRL+C was pressed except KeyboardInterrupt: self.log(logging.INFO, "KeyboardInterrupt caught, stopping the " "processing") # Case when another CameraProcess raised an exception while initializing except BrokenBarrierError: self.log(logging.WARNING, "Exception raised in another Camera process while waiting " "for all Camera processes to be ready, stopping") # Handling any other unexpected exception except (Exception,) as exc: self._logger.exception("Exception caught wile running !", exc_info=exc) self.log(logging.ERROR, "Setting the stop event to stop the other " "Camera processes") self._stop_event.set() raise # Always calling finish in the end finally: self.finish()
[docs] def init(self) -> None: """This method should perform any action required for initializing the CameraProcess. It is meant to be overwritten by children classes, at is otherwise does not perform any action. It is called right after the :obj:`~multiprocessing.Process` starts, and when the images haven't started to be acquired yet. """ ...
[docs] def loop(self) -> None: """This method is the main loop of the CameraProcess. It is called repeatedly until the :obj:`~multiprocessing.Process` is told to stop. It should perform the desired action for handling the latest available frame, stored in the *self.img* attribute. The latest available metadata containing at least the timestamp and frame index of the latest image is stored in *self.metadata*. This method is meant to be overwritten by children classes, at is otherwise does not perform any action. """ ...
[docs] def finish(self) -> None: """This method should perform any action required for properly exiting the CameraProcess. It is meant to be overwritten by children classes, at is otherwise does not perform any action. It is the last method called before the :obj:`~multiprocessing.Process` ends, and at that point no more images are being acquired. """ ...
[docs] def send(self, data: Optional[Union[Dict[str, Any], Iterable[Any]]]) -> None: """This method allows sending data to downstream Blocks. It is similar to the :meth:`~crappy.blocks.Block.send` method of the :class:`~crappy.blocks.Block`. It accepts data either as a :obj:`dict`, or as an iterable of values. """ # Just in case, not handling non-existing data if data is None: return # Case when the data to send is not given as a dict if not isinstance(data, dict): # First, checking that labels are provided if self._labels is None or not self._labels: self._logger.log(logging.ERROR, "Trying to send data as an iterable, " "but no labels are specified !") raise LinkDataError # Trying to convert iterable data to dict using the given labels try: self._logger.log(logging.DEBUG, f"Converting {data} to dict before " f"sending") data = dict(zip(self._labels, data)) except TypeError: self._logger.log(logging.ERROR, f"Cannot convert data to send (of type" f" {type(data)}) to dict ! Please " f"ensure that the data is given as an " f"iterable, as well as the labels.") # Sending the data to the downstream Blocks for link in self._outputs: self._logger.log(logging.DEBUG, f"Sending {data} to Link {link.name}") link.send(data)
[docs] def send_to_draw(self, to_draw: Iterable[Overlay]) -> None: """This method sends a collection of :class:`~crappy.tool.camera_config.config_tools.Overlay` objects to the :class:`~crappy.blocks.camera_processes.Displayer` CameraProcess. The overlays are sent by the CameraProcess performing the image processing, so that the area(s) of interest can be displayed simultaneously. """ # Not sending if there's no Connection to send data through if self._to_draw_conn is None: return self.log(logging.DEBUG, "Sending the overlays to the displayer process") # Sending the overlay if self._system == 'Linux': if select([], [self._to_draw_conn], [], 0)[1]: # Can only check on Linux if a pipe is full self._to_draw_conn.send(to_draw) elif time() - self._last_warn > 1: # Warning in case the pipe is full self._last_warn = time() self.log(logging.WARNING, f"Cannot send the overlay to draw to the " f"Displayer process, the Pipe is full !") else: self._to_draw_conn.send(to_draw)
[docs] def log(self, level: int, msg: str) -> None: """Sends a log message to the :obj:`~logging.Logger`. Args: level: The logging level, as an :obj:`int`. msg: The message to log, as a :obj:`str`. """ if self._logger is None: return self._logger.log(level, msg)
def _get_data(self) -> bool: """This method allows to grab the latest available frame. It first acquired the :obj:`~multiprocessing.RLock` protecting the shared :obj:`~multiprocessing.Array` containing the image, then copies the image locally and releases the Lock. It also copies the metadata associated to the image. Returns: :obj:`True` in case a frame was acquired and needs to be handled, or :obj:`False` if no frame was grabbed and nothing should be done. """ # Acquiring the Lock to avoid conflicts with other CameraProcesses with self._lock: # In case there's no frame grabbed yet if 'ImageUniqueID' not in self._data_dict: return False # In case the frame in buffer was already handled during a previous loop if self._data_dict['ImageUniqueID'] == self.metadata['ImageUniqueID']: return False # Copying the metadata self.metadata = self._data_dict.copy() self.log(logging.DEBUG, f"Got new image to process with id " f"{self.metadata['ImageUniqueID']}") # Copying the frame np.copyto(self.img, np.frombuffer(self._img_array.get_obj(), dtype=self._dtype).reshape(self._shape)) return True def _set_logger(self) -> None: """Initializes the :obj:`~logging.Logger` for the CameraProcess. If :func:`multiprocessing.get_start_method` is `'spawn'` (mostly Windows and Mac for Python < 3.12), redirects the log messages to a :obj:`multiprocessing.Queue` sending them to the main Process. """ logger = logging.getLogger(self.name) # Disabling logging if requested if self._log_level is not None: logger.setLevel(self._log_level) else: logging.disable() # On Windows, the messages need to be sent through a Queue for logging if get_start_method() == "spawn" and self._log_level is not None: queue_handler = logging.handlers.QueueHandler(self._log_queue) queue_handler.setLevel(min(self._log_level, logging.INFO)) logger.addHandler(queue_handler) self._logger = logger