Source code for crappy.camera.raspberry_pi_camera_2

# coding: utf-8

import numpy as np
from typing import Any, Literal
import logging
from dataclasses import dataclass
from time import time, strftime, gmtime
from threading import RLock

from .meta_camera import Camera
from .._global import OptionalModule

try:
  from picamera2 import Picamera2
  from picamera2.encoders import Encoder
  from picamera2.outputs import Output
  from picamera2.request import MappedArray, CompletedRequest
  from picamera2.controls import ControlType
except (ModuleNotFoundError, ImportError):
  Picamera2 = OptionalModule("picamera2")
  Encoder = OptionalModule("picamera2")
  Output = OptionalModule("picamera2")
  MappedArray = OptionalModule("picamera2")
  CompletedRequest = OptionalModule("picamera2")

try:
  from libcamera import controls
  # Mapping of all the autofocus modes
  AUTO_FOCUS_MODE = {
    controls.AfModeEnum.Continuous: 'Continuous',
    controls.AfModeEnum.Manual: 'Manual',
    controls.AfModeEnum.Auto: 'Auto'}
  AUTO_FOCUS_MODE_INV = {
    val: key for key, val in AUTO_FOCUS_MODE.items()}
except (ModuleNotFoundError, ImportError):
  controls = OptionalModule("libcamera")
  AUTO_FOCUS_MODE = OptionalModule("libcamera")
  AUTO_FOCUS_MODE_INV = OptionalModule("libcamera")

try:
  import cv2
except (ModuleNotFoundError, ImportError):
  cv2 = OptionalModule("opencv-python")

# Pixel formats implemented in Crappy
PIXEL_FORMATS = ('YUV420', 'RGB888')

# Protect import to avoid raising exception when picamera2 is not installed
if not isinstance(Encoder, OptionalModule):


  class CrappyEncoder(Encoder):
    """Overloading of the :class:`picamera2.Encoder` class for the specific
    pipeline implemented in Crappy.

    Compared to the original class, passes a mapped array to the Output instead
    of a frame buffer, and also transmits all the metadata instead of just the
    timestamp.
    """

    def outputframe(self,
                    array: MappedArray,
                    metadata: dict[str, Any]) -> None:
      """Passes the mapped array of the captured frame and its metadata to the
      Output object."""

      # _output_lock might not be available depending on the version of
      # picamera2
      if hasattr(self, '_output_lock'):
        with self._output_lock:
          for out in self._output:
            out.outputframe(array, metadata)
      else:
        for out in self._output:
          out.outputframe(array, metadata)

    def _encode(self, stream, request: CompletedRequest) -> None:
      """Converts a captured image to a numpy array, extracts its metadata, and
      passes them to the Output."""

      # Probably useless here, but included for consistency with original
      # method
      if isinstance(stream, str):
        stream = request.stream_map[stream]

      # Gets the metadata and passes the frame as a mapped array
      metadata = request.get_metadata()
      with MappedArray(request, stream, reshape=True, write=True) as array:
        self.outputframe(array, metadata)


# Protect import to avoid raising exception when picamera2 is not installed
if not isinstance(Output, OptionalModule):


  class CrappyOutput(Output):
    """Overloading of the :class:`picamera2.Encoder` class for the specific
    pipeline implemented in Crappy.

    Each time a frame is received, shares it with the Camera object along with
    its metadata.
    """

    def __init__(self,
                 shared: dict[str, np.ndarray | None | dict[str, Any]],
                 lock: RLock) -> None:
      """Initializes the parent class and sets the arguments.

      Args:
        shared: a :obj:`dict` used for sharing the acquired frames and their
          metadata with the Camera object.
        lock: An :obj:`~threading.RLock` ensuring the Output and the Camera
          objects are not reading/writing in the shared :obj:`dict` at the same
          time.
      """

      super().__init__(pts=None)

      self._shared: dict[str, np.ndarray | None |dict[str, Any]] = shared
      self._frame_count: int = 0
      self._lock = lock

    def outputframe(self, array: MappedArray,
                    metadata: dict[str, Any]) -> None:
      """Shares the acquired frame and part of its metadata with the Camera
      object."""

      # Specify a limited set of metadata fields we're interested in
      to_retrieve = ('SensorTimestamp', 'ExposureTime',
                     'AnalogueGain', 'DigitalGain')

      with self._lock:
        # Place the acquired image in the shared dict
        self._shared['array'] = array.array.copy()
        # Place a subset of the metadata fields in the shared dict
        self._shared['metadata'] = {key: val for key, val in metadata.items()
                                    if key in to_retrieve and key in metadata}

        # Add a few extra fields to the metadata dictionary
        t = time()
        self._shared['metadata'] |= {'ImageUniqueID': self._frame_count,
                                     't(s)': t,
                                     'DateTimeOriginal':
                                     strftime("%Y:%m:%d %H:%M:%S", gmtime(t)),
                                     'SubsecTimeOriginal': f'{t % 1:.6f}'}
        self._frame_count += 1


@dataclass
class SensorMode:
  """Structure containing all the information about a sensor mode supported by
  the used camera."""
  
  format: str
  unpacked: str
  bit_depth: int
  size: tuple[int, int]
  fps: float
  crop_limits: tuple[int, int, int, int]
  exposure_limits: tuple[int, ...]
  
  @property
  def name(self) -> str:
    """Convenience property returning the name of the sensor mode, as displayed
    in the configuration window."""
    
    return (f"{self.size[0]}x{self.size[1]}, "
            f"{self.bit_depth}bits, {self.fps:.0f}fps")
    

[docs] class RaspberryPiCamera2(Camera): """:class:`~crappy.camera.Camera` object reading images from Raspberry Pi camera hardware, using the :mod:`picamera2` module. It is designed to interface seamlessly with any official Raspberry Pi camera module, and the other unofficial camera modules supported by :mod:`picamera2`. It can read images in color or grey level, in any of the video modes supported by the camera. Only the main camera settings are exposed, the more specific ones have been left out. Note: This class was only tested on PiCameraV3 and PiCameraHQ hardware, using a Raspberry Pi 5. Important: This class interfaces with the same hardware as :class:`~crappy.camera.RaspberryPiCamera`, but using an updated library. It is strongly recommended to use this class instead of the legacy one. .. versionadded:: 2.0.7 """
[docs] def __init__(self) -> None: """Sets the attributes and initializes the parent class.""" super().__init__() # Make sure there are cameras available to read if not (available := Picamera2.global_camera_info()): raise RuntimeError("No camera detected by picamera2, aborting !") # Display the available cameras in debug message available_msg = '\n'.join([f"camera n° {cam['Num']}: {cam['Model']}" for cam in available]) self.log(logging.DEBUG, f"Available cameras:{available_msg}") # Initialize objects to be used later self._cam: Picamera2 | None = None self._sensor_modes: list[SensorMode] | None = None self._encoder: Encoder | None = None self._output: Output | None = None self._current_sensor_mode: SensorMode | None = None self._started: bool = False # Initialize the objects used for sharing data with the Output self._shared: dict[str, np.ndarray | None | dict[str, Any]] = { 'array': None, 'metadata': dict()} self._lock: RLock = RLock() self._last_id: int = -1
[docs] def open(self, camera_num: int = 0, sensor_mode: str | None = None, pixel_format: Literal['YUV420', 'RGB888'] | None = None, grey_level_images: bool | None = None, auto_exposure: bool | None = None, auto_focus: Literal['Auto', 'Manual', 'Off'] | None = None, analog_gain: float | None = None, auto_white_balance: bool | None = None, brightness: float | None = None, contrast: float | None = None, exposure_time: int | None = None, auto_exposure_value: float | None = None, lens_position: float | None = None, saturation: float | None = None, sharpness: float | None = None, soft_roi_width: int | None = None, soft_roi_height: int | None = None, soft_roi_x: int | None = None, soft_roi_y: int | None = None) -> None: """Opens the connection to the camera, instantiates the available settings and starts the acquisition. Also sets custom values for the settings if provided by the user, otherwise sets them to their default. Args: camera_num: An :obj:`int` specifying the number of the camera to read, if several cameras are connected. sensor_mode: The sensor mode to use for image acquisition, as a :obj:`str`. The available options depend on the used hardware, and will have the format: ``'<width>x<height>, <framerate>fps``. pixel_format: The pixel format to use for image acquisition. Should be one of: :: 'YUV420', 'RGB888' No major difference was observed during tests between these two modes. The RGB mode should be preferred. grey_level_images: A :obj:`bool` indicating whether to convert the acquired images to gray level. The images are always acquired in color. auto_exposure: A :obj:`bool` indicating whether auto-exposure should be enabled, if supported by the camera. auto_focus: The autofocus mode to use if supported by the camera, as a :obj:`str`. Should be one of: :: 'Auto', 'Manual', 'Off' analog_gain: The analog gain to use for image acquisition if supported by the camera, as a :obj:`float`. The possible values depend on the used camera. auto_white_balance: A :obj:`bool` indicating whether to activate the auto white balance, if supported by the camera. brightness: The brightness correction to bring to the acquired images, as a :obj:`float` between `-1.0` and `1.0`. contrast: The contrast correction to bring to the acquired images, as a :obj:`float` between `0.0` and `32.0`. exposure_time: The exposure time for image acquisition in microseconds, as an :obj:`int` between `5000` and `1000000`. The limits set in Crappy correspond to usual settings on experimental setups, but are much narrower than the actual sensor limits. auto_exposure_value: A parameter allowing to adjust the exposure time when using auto-exposure, as a :obj:`float` between `-8.0` and `8.0`. lens_position: If supported by the camera, a :obj:`float` that sets the position of the motorized lens and allows adjusting the focus. The possible values depend on the used camera. saturation: The saturation correction to bring to the acquired images, as a :obj:`float` between `0.0` and `32.0`. sharpness: The sharpness correction to bring to the acquired images, as a :obj:`float` between `0.0` and `16.0`. soft_roi_width: The maximum width of the cropped image, when applying a software ROI. soft_roi_height: The maximum height of the cropped image, when applying a software ROI. soft_roi_x: The `x` offset of the cropped image on the original acquired frame, when applying a software ROI. soft_roi_y: The `y` offset of the cropped image on the original acquired frame, when applying a software ROI. """ # Opening the chosen camera self.log(logging.DEBUG, f"Opening the camera n°{camera_num}") self._cam = Picamera2(camera_num=camera_num) available_controls = self._cam.camera_controls # Listing all the available sensor modes for the opened camera self._sensor_modes = [SensorMode(mode['format'], mode['unpacked'], mode['bit_depth'], mode['size'], mode['fps'], mode['crop_limits'], mode['exposure_limits']) for mode in self._cam.sensor_modes] self.log(logging.DEBUG, f"Available sensor modes: {self._sensor_modes}") # Characteristics of the current sensor mode are needed later self._current_sensor_mode = self._get_mode_from_str( self._get_sensor_mode()) # These settings always have the same value self._cam.video_configuration.use_case = 'video' self._cam.video_configuration.buffer_count = 5 self._cam.video_configuration.display = None self._cam.video_configuration.encode = 'main' self._cam.video_configuration.queue = True self._cam.video_configuration.controls['FrameDurationLimits'] = (2500, 2000000) # Hardware mode parameters self.add_choice_setting('sensor_mode', [mode.name for mode in self._sensor_modes], self._get_sensor_mode, self._set_sensor_mode, self._get_sensor_mode()) self.add_choice_setting('pixel_format', PIXEL_FORMATS, self._get_pixel_format, self._set_pixel_format, self._get_pixel_format()) # Auto white balance parameters if 'AwbEnable' in available_controls: self.add_bool_setting('auto_white_balance', None, self._set_auto_white_balance, False) # Generic camera parameters that should be available most of the time if 'AnalogueGain' in available_controls: self.add_scale_setting('analog_gain', available_controls['AnalogueGain'][0], available_controls['AnalogueGain'][1], None, self._set_gain, available_controls['AnalogueGain'][0]) if 'Brightness' in available_controls: self.add_scale_setting('brightness', -1.0, 1.0, None, self._set_brightness, 0.0) if 'Contrast' in available_controls: self.add_scale_setting('contrast', 0.0, 32.0, None, self._set_contrast, 1.0) if 'Saturation' in available_controls: self.add_scale_setting('saturation', 0.0, 32.0, None, self._set_saturation, 1.0) if 'Sharpness' in available_controls: self.add_scale_setting('sharpness', 0.0, 16.0, None, self._set_sharpness, 1.0) # Exposure related parameters if 'AeEnable' in available_controls: self.add_bool_setting('auto_exposure', None, self._set_auto_exposure, False) if 'ExposureTime' in available_controls: low = self._current_sensor_mode.exposure_limits[0] high = self._current_sensor_mode.exposure_limits[1] self.add_scale_setting('exposure_time', max(low, 5000), min(high, 1000000), None, self._set_exposure_time, min(max(33333, low), high)) if 'ExposureValue' in available_controls: self.add_scale_setting('auto_exposure_value', -8.0, 8.0, None, self._set_auto_exposure_value, 0.0) # Focus related parameters if 'AfMode' in available_controls: self.add_choice_setting('auto_focus', AUTO_FOCUS_MODE.values(), None, self._set_auto_focus, 'Manual') if 'LensPosition' in available_controls: self.add_scale_setting('lens_position', available_controls['LensPosition'][0], available_controls['LensPosition'][1], None, self._set_lens_position, available_controls['LensPosition'][0]) # Parameters driving the post-processing performed in Crappy self.add_bool_setting('grey_level_images', None, None, False) self.add_software_roi(self._current_sensor_mode.size[0], self._current_sensor_mode.size[1]) # Collecting the kwargs to set and setting them to_set = {name: arg for name, arg in zip( ('sensor_mode', 'pixel_format', 'grey_level_images', 'auto_exposure', 'auto_focus', 'analog_gain', 'ROI_width', 'ROI_height', 'ROI_x', 'ROI_y', 'auto_white_balance', 'brightness', 'contrast', 'exposure_time', 'auto_exposure_value', 'lens_position', 'saturation', 'sharpness'), (sensor_mode, pixel_format, grey_level_images, auto_exposure, auto_focus, analog_gain, soft_roi_width, soft_roi_height, soft_roi_x, soft_roi_y, auto_white_balance, brightness, contrast, exposure_time, auto_exposure_value, lens_position, saturation, sharpness)) if arg is not None and name in self.settings} self.set_all(**to_set) # Applying the video configuration self.log(logging.DEBUG, "Applying the video configuration on the camera") self._cam.configure('video') # Defining custom Encoder and Output objects self._encoder = CrappyEncoder() self._output = CrappyOutput(shared=self._shared, lock = self._lock) # Starting the image acquisition self.log(logging.DEBUG, "Starting image acquisition") self._cam.start_recording(self._encoder, self._output) self._started = True
[docs] def get_image(self) -> tuple[dict[str, Any], np.ndarray] | None: """Grabs the latest image from the shared buffer, converts it to RGB and grey level if necessary, and returns it along with its metadata.""" # Accessing data using a lock to avoid simultaneous access with self._lock: # Only proceeding if there is a new image in the shared buffer if (self._shared['array'] is not None and self._shared['metadata'] and self._shared['metadata']['ImageUniqueID'] > self._last_id): self._last_id = self._shared['metadata']['ImageUniqueID'] # Dirty trick to avoid communication with hardware in this critical # section # Also, checking the shape of the array because some frames from the # previous setting might still be in the buffer if (self.settings['pixel_format']._value_no_getter == 'YUV420' and len(self._shared['array'].shape) == 2): # For YUV images, converting to RGB img = cv2.cvtColor(self._shared['array'], cv2.COLOR_YUV420p2RGB) else: img = self._shared['array'] # Converting the grey level if requested by the user if self.settings['grey_level_images'].value: img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) return self._shared['metadata'], img
[docs] def close(self) -> None: """Stops the image acquisition and closes the connection to the camera.""" if self._cam is not None: self.log(logging.DEBUG, "Stopping image acquisition") self._cam.stop_recording() self.log(logging.DEBUG, "Closing the connection to the camera") self._cam.close()
def _set_sensor_mode(self, mode_str: str) -> None: """""" # Get the SensorMode object corresponding to the requested mode mode = [mode for mode in self._sensor_modes if mode.name == mode_str][0] self._current_sensor_mode = mode pixel_format = self._get_pixel_format() # The sensor mode cannot be adjusted while the camera is running if self._started: self.log(logging.DEBUG, "Stopping image acquisition") self._cam.stop_recording() # The pixel format must be changed when updating the sensor mode, so doing # both at the same time self._cam.video_configuration.sensor = {'output_size': mode.size, 'bit_depth': mode.bit_depth} self._cam.video_configuration.main = {'format': pixel_format, 'size': mode.size} # The possible size of the software ROI must be updated when changing the # sensor mode self.reload_software_roi(mode.size[0], mode.size[1]) if self._started: # Re-applying the configuration to make the changes effective self._cam.configure('video') # Re-starting the image acquisition self.log(logging.DEBUG, "Starting image acquisition") self._cam.start_recording(self._encoder, self._output) def _set_pixel_format(self, pixel_format: str) -> None: """Sets the used pixel format to the desired value.""" # Handles the case when the camera is initially started with an unsupported # pixel format, like XRGB8888 if pixel_format not in PIXEL_FORMATS: pixel_format = 'RGB888' mode = self._current_sensor_mode # The pixel format cannot be adjusted while the camera is running if self._started: self.log(logging.DEBUG, "Stopping image acquisition") self._cam.stop_recording() # The mode size is needed for setting the pixel format, so configuring the # sensor mode at the same time self._cam.video_configuration.sensor = {'output_size': mode.size, 'bit_depth': mode.bit_depth} self._cam.video_configuration.main = {'format': pixel_format, 'size': mode.size} if self._started: # Re-applying the configuration to make the changes effective self._cam.configure('video') # Re-starting the image acquisition self.log(logging.DEBUG, "Starting image acquisition") self._cam.start_recording(self._encoder, self._output) def _set_auto_exposure(self, auto_exposure: bool) -> None: """Enables or disables the auto exposure feature.""" if not self._started: self._cam.video_configuration.controls['AeEnable'] = auto_exposure else: self._cam.set_controls({'AeEnable': auto_exposure}) def _set_auto_focus(self, mode: str) -> None: """Sets the autofocus mode of the camera to the desired value.""" mode_enum = AUTO_FOCUS_MODE_INV[mode] if not self._started: self._cam.video_configuration.controls['AfMode'] = mode_enum else: self._cam.set_controls({'AfMode': mode_enum}) def _set_gain(self, gain: float) -> None: """Sets the gain of the image to the desired value.""" if not self._started: self._cam.video_configuration.controls['AnalogueGain'] = gain else: self._cam.set_controls({'AnalogueGain': gain}) def _set_auto_white_balance(self, auto_white_balance: bool) -> None: """Enables or disables the auto white balance feature.""" if not self._started: self._cam.video_configuration.controls['AwbEnable'] = auto_white_balance else: self._cam.set_controls({'AwbEnable': auto_white_balance}) def _set_brightness(self, brightness: float) -> None: """Sets the brightness of the image to the desired value.""" if not self._started: self._cam.video_configuration.controls['Brightness'] = brightness else: self._cam.set_controls({'Brightness': brightness}) def _set_contrast(self, contrast: float) -> None: """Sets the contrast of the image to the desired value.""" if not self._started: self._cam.video_configuration.controls['Contrast'] = contrast else: self._cam.set_controls({'Contrast': contrast}) def _set_exposure_time(self, exposure: float) -> None: """Sets the exposure time of the camera to the desired value.""" if not self._started: self._cam.video_configuration.controls['ExposureTime'] = exposure else: self._cam.set_controls({'ExposureTime': exposure}) def _set_auto_exposure_value(self, value: float) -> None: """Sets the auto-exposure target to the desired value.""" if not self._started: self._cam.video_configuration.controls['ExposureValue'] = value else: self._cam.set_controls({'ExposureValue': value}) def _set_lens_position(self, position: float) -> None: """Sets the position of the motorized lens to the desired value.""" if not self._started: self._cam.video_configuration.controls['LensPosition'] = position else: self._cam.set_controls({'LensPosition': position}) def _set_saturation(self, saturation: float) -> None: """Sets the saturation of the image to the desired value.""" if not self._started: self._cam.video_configuration.controls['Saturation'] = saturation else: self._cam.set_controls({'Saturation': saturation}) def _set_sharpness(self, sharpness: float) -> None: """Sets the sharpness of the image to the desired value.""" if not self._started: self._cam.video_configuration.controls['Sharpness'] = sharpness else: self._cam.set_controls({'Sharpness': sharpness}) def _get_sensor_mode(self) -> str: """Returns the name of the current sensor mode.""" sensor_config = self._cam.video_configuration.sensor mode_str = (f"{sensor_config.output_size[0]}x" f"{sensor_config.output_size[1]}, " f"{sensor_config.bit_depth}bits") return self._get_mode_name_from_str(mode_str) def _get_pixel_format(self) -> Literal['YUV420', 'RGB888']: """Returns the name of the current pixel format.""" return self._cam.video_configuration.main.format def _get_mode_name_from_str(self, mode_str: str) -> str: """Convenience method returning the full name of a sensor mode using the beginning of its name.""" try: return [mode.name for mode in self._sensor_modes if mode.name.startswith(mode_str)][0] except IndexError: raise RuntimeError(f"Unhandled sensor mode read from camera: {mode_str}") def _get_mode_from_str(self, mode_str: str) -> SensorMode: """Convenience method returning a SensorMode object using the beginning of its name.""" try: return [mode for mode in self._sensor_modes if mode.name.startswith(mode_str)][0] except IndexError: raise RuntimeError(f"Unhandled sensor mode read from camera: {mode_str}")