Source code for

# coding: utf-8

from time import time, sleep
from typing import Tuple, Any, Optional
import numpy as np
from threading import Thread, RLock
import logging

from .meta_camera import Camera
from .._global import OptionalModule

  import cv2
except (ModuleNotFoundError, ImportError):
  cv2 = OptionalModule("opencv-python")

  from picamera import PiCamera as PiCameraRPi
  from picamera.array import PiRGBArray
except (ModuleNotFoundError, ImportError, OSError):
  PiCameraRPi = OptionalModule("picamera")

picamera_iso = [0, 100, 200, 320, 400, 500, 640, 800]

#   Update to picamera2 when available

[docs] class RaspberryPiCamera(Camera): """Class for reading images from a Raspberry Pi Camera. The RaspberryPiCamera Camera block is meant for reading images from a Raspberry Pi Camera. It uses the :mod:`picamera` module for capturing images, and :mod:`cv2` for converting BGR images to black and white. It can read images from the PiCamera V1, V2 and HQ models indifferently. Warning: Only works on Raspberry Pi, with the picamera API. On the latest OS release "Bullseye", it has to be specifically activated in the configuration menu. .. versionadded:: 1.4.0 .. versionchanged:: 2.0.0 renamed from *Picamera* to *RaspberryPiCamera* """
[docs] def __init__(self) -> None: """Instantiates the available settings.""" super().__init__() self._frame_grabber: Optional[Thread] = None self._capture = None self._cam = None self.log(logging.INFO, "Opening the connection to the camera") self._cam = PiCameraRPi() # Settings definition self.add_scale_setting('Width', 1, 3280, self._get_width, self._set_width, 1280) self.add_scale_setting('Height', 1, 2464, self._get_height, self._set_height, 720) self.add_scale_setting('Iso (0 for auto)', 0, 800, self._get_iso, self._set_iso, 0) self.add_scale_setting('Brightness', 0, 100, self._get_brightness, self._set_brightness, 50) self.add_scale_setting('Contrast', -100, 100, self._get_contrast, self._set_contrast, 0) self.add_scale_setting('Saturation', -100, 100, self._get_saturation, self._set_saturation, 0) self.add_scale_setting('Shutter speed (0 for auto)', 0, 30, self._get_shutter_speed, self._set_shutter_speed, 0) self.add_bool_setting('Black_and_white', self._get_black_white, self._set_black_white, True) self.add_scale_setting('Crop: X offset', 0.0, 1.0, self._get_crop_x_offset, self._set_crop_x_offset, 0.0) self.add_scale_setting('Crop: Y offset', 0.0, 1.0, self._get_crop_y_offset, self._set_crop_y_offset, 0.0) self.add_scale_setting('Crop: width', 0.0, 1.0, self._get_crop_width, self._set_crop_width, 1.0) self.add_scale_setting('Crop: height', 0.0, 1.0, self._get_crop_height, self._set_crop_height, 1.0) self._frame_grabber = Thread(target=self._grab_frame) self._lock = RLock() self._frame = None self._stop = False self._started = False self._stream = None
[docs] def open(self, **kwargs: Any) -> None: """Sets the settings to their default values and starts the image acquisition thread.""" self.set_all(**kwargs) # Starting the video stream self._capture = PiRGBArray(self._cam, (self._get_width(), self._get_height())) self.log(logging.INFO, "Starting the frame stream") self._stream = self._cam.capture_continuous(self._capture, format='bgr', use_video_port=True) self.log(logging.INFO, "Starting the frame grabber thread") self._frame_grabber.start() sleep(1) self._started = True
[docs] def get_image(self) -> Tuple[float, np.ndarray]: """Simply returns the last image in the acquisition buffer. The captured image is in GBR format, and converted into black and white if needed. Returns: The timeframe and the image. """ t = time() with self._lock: output = self._frame if self.Black_and_white: output = cv2.cvtColor(output, cv2.COLOR_BGR2GRAY) return t, output
[docs] def close(self) -> None: """Joins the image acquisition thread, and closes the stream and the :class:`picamera.PiCamera` object.""" self._stop = True if self._frame_grabber is not None: self.log(logging.INFO, "Stopping the frame grabber thread") self._frame_grabber.join(0.2) if self._frame_grabber.is_alive(): self.log(logging.WARNING, "The frame grabber thread didn't stop " "properly !") if self._capture is not None: self.log(logging.INFO, "Stopping the frame stream") self._capture.close() if self._cam is not None: self.log(logging.INFO, "Opening the connection to the camera") self._cam.close()
def _stop_stream(self) -> None: """Stops the video stream. Called before changing the image size.""" if not self._started: return self._stop = True self.log(logging.INFO, "Stopping the frame grabber thread") self._frame_grabber.join() self.log(logging.INFO, "Stopping the frame stream") self._capture.close() def _restart_stream(self) -> None: """Restarts the video stream. Called after a change in the image size.""" if not self._started: return self._stop = False self._frame_grabber = Thread(target=self._grab_frame) self._capture = PiRGBArray(self._cam, (self._get_width(), self._get_height())) self.log(logging.INFO, "Starting the frame stream") self._stream = self._cam.capture_continuous(self._capture, format='bgr', use_video_port=True) self.log(logging.INFO, "Starting the frame grabber thread") self._frame_grabber.start() sleep(1) def _grab_frame(self) -> None: """Target of a thread for grabbing the last image in the video stream and putting it in a buffer.""" for frame in self._stream: with self._lock: self.log(logging.DEBUG, "Got new frame from stream") self._frame = frame.array self._capture.truncate(0) if self._stop: break def _get_width(self) -> int: return self._cam.resolution[0] def _get_height(self) -> int: return self._cam.resolution[1] def _get_iso(self) -> int: return self._cam.iso def _get_brightness(self) -> int: return self._cam.brightness def _get_contrast(self) -> int: return self._cam.contrast def _get_saturation(self) -> int: return self._cam.saturation def _get_shutter_speed(self) -> float: return self._cam.shutter_speed def _get_black_white(self) -> bool: return self._cam.color_effects == (128, 128) def _get_crop_x_offset(self) -> float: return self._cam.zoom[0] def _get_crop_y_offset(self) -> float: return self._cam.zoom[1] def _get_crop_width(self) -> float: return self._cam.zoom[2] def _get_crop_height(self) -> float: return self._cam.zoom[3] def _set_width(self, width: float) -> None: # The Raspberry Pi Camera only accepts width that are multiples of 32 self._stop_stream() self._cam.resolution = (32 * (width // 32), self._get_height()) self._restart_stream() def _set_height(self, height: float) -> None: # The Raspberry Pi Camera only accepts heights that are multiples of 32 self._stop_stream() self._cam.resolution = (self._get_width(), 32 * (height // 32)) self._restart_stream() def _set_iso(self, iso: float) -> None: # The Raspberry Pi Camera only accepts a limited range of iso values self._cam.iso = min(picamera_iso, key=lambda x: abs(x - iso)) def _set_brightness(self, brightness: float) -> None: self._cam.brightness = brightness def _set_contrast(self, contrast: float) -> None: self._cam.contrast = contrast def _set_saturation(self, saturation: float) -> None: self._cam.saturation = saturation def _set_shutter_speed(self, shutter_speed: float) -> None: self._cam.shutter_speed = shutter_speed def _set_black_white(self, boolean: bool) -> None: if boolean: self._cam.color_effects = (128, 128) else: self._cam.color_effects = None def _set_crop_x_offset(self, x_offset: float) -> None: self._cam.zoom = (x_offset, self._get_crop_y_offset(), self._get_crop_width(), self._get_crop_height()) def _set_crop_y_offset(self, y_offset: float) -> None: self._cam.zoom = (self._get_crop_x_offset(), y_offset, self._get_crop_width(), self._get_crop_height()) def _set_crop_width(self, width: float) -> None: self._cam.zoom = (self._get_crop_x_offset(), self._get_crop_y_offset(), width, self._get_crop_height()) def _set_crop_height(self, height: float) -> None: self._cam.zoom = (self._get_crop_x_offset(), self._get_crop_y_offset(), self._get_crop_width(), height)