Source code for crappy.camera.gstreamer_camera_basic

# coding: utf-8

from time import time, sleep
from numpy import uint8, ndarray, uint16, copy, squeeze
from typing import Tuple, Optional, Union, List
from re import findall, search
from subprocess import Popen, PIPE, run
from fractions import Fraction
from platform import system
import logging

from .meta_camera import Camera
from .._global import OptionalModule

try:
  import cv2
except (ImportError, ModuleNotFoundError):
  pass
  cv2 = OptionalModule('opencv-python')

try:
  from gi import require_version
  require_version('Gst', '1.0')
  require_version('GstApp', '1.0')
  from gi.repository import Gst, GstApp
except (ImportError, ModuleNotFoundError, ValueError):
  Gst = GstApp = OptionalModule('PyGObject')


[docs] class CameraGstreamer(Camera): """A class for reading images from a video device using Gstreamer. It can read images from the default video source, or a video device can be specified. The user has access to a range of parameters for tuning the image, depending on the capability of the hardware. Alternatively, it is possible to give a custom GStreamer pipeline as an argument. In this case no settings are available, and it is up to the user to ensure the validity of the pipeline. This class uses less resources and is compatible with more cameras than the :class:`~crappy.camera.CameraOpencv` camera, that relies on OpenCV. The installation of GStreamer is however less straightforward than the one of OpenCV, especially on Windows ! Warning: There are two classes for CameraGstreamer, one for Linux based on `v4l-utils`, and another one for Linux (without `v4l-utils`) and other OS. Depending on the installation of `v4l-utils` and the OS, the correct class will be automatically imported. The version using `v4l-utils` allows tuning more parameters than the basic version. Note: This Camera requires the module :mod:`PyGObject` to be installed, as well as GStreamer. .. versionadded:: 1.5.9 .. versionchanged:: 2.0.0 renamed from *Camera_gstreamer* to *CameraGstreamer* """
[docs] def __init__(self) -> None: """Simply initializes the instance attributes.""" super().__init__() Gst.init(None) self._last_frame_nr = 0 self._frame_nr = 0 # These attributes will be set later self._pipeline = None self._process: Optional[Popen] = None self._img: Optional[ndarray] = None self._device: Optional[Union[str, int]] = None self._user_pipeline: Optional[str] = None self._nb_channels: int = 3 self._img_depth: int = 8 self._formats: List[str] = list() self._app_sink = None
[docs] def open(self, device: Optional[Union[int, str]] = None, user_pipeline: Optional[str] = None, nb_channels: Optional[int] = None, img_depth: Optional[int] = None, **kwargs) -> None: """Opens the pipeline, sets the settings and starts the acquisition of images. A custom pipeline can be specified using the ``user_pipeline`` argument. Simply copy-paste the pipeline as it is in the terminal, and the class will take care of adjusting it to make it compatible with the Python binding of GStreamer. Note: For custom pipelines, it may be necessary to set a format directly in the pipeline (which isn't the case in the terminal). For instance, this line .. code-block:: shell-session gst-launch-1.0 autovideosrc ! videoconvert ! autovideosink May need to become .. code-block:: shell-session gst-launch-1.0 autovideosrc ! videoconvert ! video/x-raw,format=BGR ! \ autovideosink Note: Pipelines built using a pipe for redirecting the stream from another application are also accepted. For example, the following user pipeline is valid : .. code-block:: shell-session gphoto2 --stdout --capture-movie | gst-launch-1.0 fdsrc fd=0 ! \ videoconvert ! autovideosink Args: device: The video device to open, if the device opened by default isn't the right one. In Linux, should be a path like `/dev/video0`. In Windows and Mac, should be the index of the video device. This argument is ignored if a ``user_pipeline`` is given. user_pipeline: A custom pipeline that can optionally be given as a :obj:`str`. If given, the ``device`` argument is ignored. The pipeline should be given as it would be in a terminal. nb_channels: The number of channels expected in the acquired images, in case a custom pipeline is given. Otherwise, this argument is ignored. For now, Crappy only manages 1- and 3-channel images. img_depth: The bit depth of each channel of the acquired images, in case a custom pipeline is given. Otherwise, this argument is ignored. For now, Crappy only manages 8- and 16-bits deep images. **kwargs: Allows specifying values for the settings even before displaying the configuration window. """ # Checking the validity of the arguments if device is not None and system() == 'Linux' and not isinstance(device, str): raise ValueError("In Linux, device should be a string !") elif device is not None and system() in ['Darwin', 'Windows'] and not \ isinstance(device, int): raise ValueError("In Windows and Mac, device should be an integer !") if img_depth is not None and img_depth not in [8, 16]: raise ValueError('The img_depth must be either 8 or 16 (bits)') if user_pipeline is not None and nb_channels is None: raise ValueError('nb_channels must be given if user_pipeline is !') if user_pipeline is not None and img_depth is None: raise ValueError('img_depth must be given if user_pipeline is !') # Parsing the user pipeline if given if user_pipeline is not None: # Removing the call to gst-launch-1.0 user_pipeline = user_pipeline.replace('gst-launch-1.0 ', '') # Splitting in case there's a pipe in the command user_pipeline = user_pipeline.split(' | ', 1) # There's no pipe in the pipeline if len(user_pipeline) == 1: user_pipeline = user_pipeline[0] # Setting the sink user_pipeline = user_pipeline.split(' ! ') user_pipeline[-1] = 'appsink name=sink' user_pipeline = ' ! '.join(user_pipeline) # There's a pipe in the pipeline else: user_application = user_pipeline[0] user_pipeline = user_pipeline[1] # Opening a subprocess handling the first half of the pipeline self.log(logging.INFO, f"Running command " f"{user_application.split(' ')}") self._process = Popen(user_application.split(' '), stdout=PIPE) file_descriptor = self._process.stdout.fileno() # Setting the source and the sink, according to the file descriptor of # the subprocess user_pipeline = user_pipeline.split(' ! ') user_pipeline[0] = f'fdsrc fd={file_descriptor}' user_pipeline[-1] = 'appsink name=sink' user_pipeline = ' ! '.join(user_pipeline) # Setting the expected properties of the image self._device = device self._user_pipeline = user_pipeline self._nb_channels = nb_channels if nb_channels is not None else 3 self._img_depth = img_depth if img_depth is not None else 8 # Defining the settings in case no custom pipeline was given if user_pipeline is None: # Checking the available formats with the Gst device monitor cam = None device_monitor = Gst.DeviceMonitor.new() device_monitor.start() devices = [device for device in device_monitor.get_devices() if device.get_device_class() == "Video/Source"] # Stop right here if there is no connected camera if not devices: self.log(logging.ERROR, "No camera devices available for reading !") raise IOError # Finding the specified device in the available ones if self._device is not None: for device in devices: properties = device.get_properties() api = properties.get_value('device.api') dev = properties.get_value('object.path') if dev == f'{api}:{self._device}': cam = device break # Raising an exception if the specified device cannot be found if self._device is not None and cam is None: self.log(logging.ERROR, f"Could not open the specified camera " f"at: {self._device}") raise IOError # Defining a default camera if the device is not specified if self._device is None: cam = devices[0] path = cam.get_properties().get_value('object.path') self.log(logging.WARNING, f"No specific camera to open specified, " f"opening {path.split(':')[1]} by default") # Getting the available image formats caps = cam.get_caps() nb_formats = caps.get_size() for i in range(nb_formats): form = None struct = caps.get_structure(i) name = struct.get_name() if name == 'image/jpeg': form = 'MJPG' elif name == 'video/x-h264': form = 'H264' elif name == 'video/x-hevc': form = 'HEVC' elif name == 'video/x-raw': form = struct.get_value('format') else: self.log(logging.WARNING, f"Found unsupported cap format: {name}, " f"ignoring it") # Formatting the format into the v4l2 syntax if form is not None: width = struct.get_value('width') height = struct.get_value('height') framerates = findall(r'(\d+/\d+)', struct.to_string()) for fps_str in framerates: fps = f"{float(Fraction(fps_str)):.3f}" self._formats.append(f"{form} {width}x{height} ({fps} fps)") device_monitor.stop() # Checking if the formats are supported with the installed libraries if self._formats: h264_available = bool(run(['gst-inspect-1.0', 'avdec_h264'], capture_output=True, text=True).stdout) h265_available = bool(run(['gst-inspect-1.0', 'avdec_h265'], capture_output=True, text=True).stdout) if not h264_available and any(form.split()[0] == 'H264' for form in self._formats): self._formats = [form for form in self._formats if form.split()[0] != 'H264'] self.log(logging.WARNING, "The video format H264 could be available " "for the selected camera if " "gstreamer1.0-libav was installed !") if not h265_available and any(form.split()[0] == 'HEVC' for form in self._formats): self._formats = [form for form in self._formats if form.split()[0] != 'HEVC'] self.log(logging.WARNING, "The video format H265 could be available " "for the selected camera if " "gstreamer1.0-libav was installed !") if self._formats: # Creating the format parameter self.add_choice_setting(name='format', choices=tuple(self._formats), setter=self._set_format) # These settings are always available no matter the platform if self._nb_channels > 1: self.add_choice_setting(name="channels", choices=('1', '3'), default='1') self.add_scale_setting(name='brightness', lowest=-1., highest=1., setter=self._set_brightness, default=0.) self.add_scale_setting(name='contrast', lowest=0., highest=2., setter=self._set_contrast, default=1.) self.add_scale_setting(name='hue', lowest=-1., highest=1., setter=self._set_hue, default=0.) self.add_scale_setting(name='saturation', lowest=0., highest=2., setter=self._set_saturation, default=1.) # Setting up GStreamer and the callback self.log(logging.INFO, "Initializing the GST pipeline") self.log(logging.DEBUG, f"The pipeline is {self._get_pipeline()}") self._pipeline = Gst.parse_launch(self._get_pipeline()) self._app_sink = self._pipeline.get_by_name('sink') self._app_sink.set_property("emit-signals", True) self._app_sink.connect("new-sample", self._on_new_sample) # Starting image acquisition self.log(logging.INFO, "Starting the GST pipeline") self._pipeline.set_state(Gst.State.PLAYING) # Checking that images are read as expected t0 = time() while not self._frame_nr: if time() - t0 > 2: raise TimeoutError( "Waited too long for the first image ! There is probably an error " "in the pipeline. The format of the received images may be " "unexpected, in which case you can try specifying it.") sleep(0.01) # Setting the kwargs if any self.set_all(**kwargs)
[docs] def get_image(self) -> Tuple[float, ndarray]: """Reads the last image acquired from the camera. Returns: The acquired image, along with a timestamp. """ # Assuming an image rate greater than 0.5 FPS # Checking that we don't return the same image twice t0 = time() while self._last_frame_nr == self._frame_nr: if time() - t0 > 2: raise TimeoutError("Waited too long for the next image !") sleep(0.01) self._last_frame_nr = self._frame_nr return time(), self.apply_soft_roi(copy(self._img))
[docs] def close(self) -> None: """Simply stops the image acquisition.""" if self._pipeline is not None: self.log(logging.INFO, "Stopping the GST pipeline") self._pipeline.set_state(Gst.State.NULL) # Closes the subprocess started in case a user pipeline containing a pipe # was given if self._process is not None: self.log(logging.INFO, "Stopping the image generating process") self._process.terminate()
def _restart_pipeline(self, pipeline: str) -> None: """Stops the current pipeline, redefines it, and restarts it. Args: pipeline: The new pipeline to use, as a :obj:`str`. """ # Stops the previous pipeline self.log(logging.INFO, "Stopping the GST pipeline") self._pipeline.set_state(Gst.State.NULL) # Redefines the pipeline and the callbacks self.log(logging.INFO, "Initializing the GST pipeline") self.log(logging.DEBUG, f"The new pipeline is {pipeline}") self._pipeline = Gst.parse_launch(pipeline) self._app_sink = self._pipeline.get_by_name('sink') self._app_sink.set_property("emit-signals", True) self._app_sink.connect("new-sample", self._on_new_sample) # Restarts the pipeline self.log(logging.INFO, "Starting the GST pipeline") self._pipeline.set_state(Gst.State.PLAYING) def _get_pipeline(self, brightness: Optional[float] = None, contrast: Optional[float] = None, hue: Optional[float] = None, saturation: Optional[float] = None, img_format: Optional[str] = None) -> str: """Method that generates a pipeline, according to the given settings. If a user-defined pipeline was given, it will always be returned. Args: brightness: The brightness value to set, as a :obj:`float` between -1 and 1. contrast: The contrast value to set, as a :obj:`float` between 0 and 2. hue: The hue value to set, as a :obj:`float` between -1 and 1. saturation: The saturation value to set, as a :obj:`float` between 0 and 2. img_format: The image format to set, as a :obj:`str` containing the name of the encoding and optionally both the width and height in pixels. Returns: A pipeline matching the current settings values. """ # Returns the custom pipeline if any if self._user_pipeline is not None: return self._user_pipeline # Choosing the source according to the platform if system() == "Linux": source = 'v4l2src' elif system() == 'Windows': source = 'ksvideosrc' elif system() == 'Darwin': source = 'vfsvideosrc' else: source = 'autovideosrc' # The source argument is handled differently according to the platform if system() == 'Linux' and self._device is not None: device = f'device={self._device}' elif system() in ['Darwin', 'Windows'] and self._device is not None: device = f'device-index={self._device}' else: device = '' # Getting the format string if not provided as an argument if img_format is None and hasattr(self, 'format'): img_format = self.format # In case it goes wrong, just try without specifying the format if img_format is None: img_format = '' try: format_name, img_size, fps = findall(r"(\w+)\s(\w+)\s\((\d+.\d+) fps\)", img_format)[0] except ValueError: format_name, img_size, fps = img_format, None, None # Default color is BGR in most cases color = 'BGR' # Adding the decoder to the pipeline if needed if format_name == 'MJPG': img_format = '! jpegdec' elif format_name == 'H264': img_format = '! h264parse ! avdec_h264' elif format_name == 'HEVC': img_format = '! h265parse ! avdec_h265' elif format_name in ('YUYV', 'YUY2'): img_format = '' elif format_name == 'GRAY8': img_format = '' color = 'GRAY8' else: self.log(logging.WARNING, f"Unsupported format name: {format_name}, " f"trying without explicitly setting the " f"decoder in the pipeline") img_format = '' # Getting the width and height from the second half of the string if img_size is not None: width, height = map(int, img_size.split('x')) else: width, height = None, None # Including the dimensions and the fps in the pipeline img_size = f',width={width},height={height}' if width else '' if fps is not None: fps = Fraction(fps) fps_str = f',framerate={fps.numerator}/{fps.denominator}' else: fps_str = '' # Finally, generate a single pipeline containing all the user settings return f"""{source} {device} name=source {img_format} ! videoconvert ! video/x-raw,format={color}{img_size}{fps_str} ! videobalance brightness={brightness if brightness is not None else self.brightness:.3f} contrast={contrast if contrast is not None else self.contrast:.3f} hue={hue if hue is not None else self.hue:.3f} saturation={saturation if saturation is not None else self.saturation:.3f} ! appsink name=sink""" def _on_new_sample(self, app_sink): """Callback that reads every new frame and puts it into a buffer. Args: app_sink: The AppSink object containing the new frames. Returns: A GStreamer object indicating that the reading went fine. """ sample = app_sink.pull_sample() caps = sample.get_caps() # Extracting the width and height info from the sample's caps height = caps.get_structure(0).get_value("height") width = caps.get_structure(0).get_value("width") # Getting the actual data buffer = sample.get_buffer() # Getting read access to the buffer data success, map_info = buffer.map(Gst.MapFlags.READ) if not success: raise RuntimeError("Could not map buffer data!") self.log(logging.DEBUG, "Grabbed new frame") # Casting the data into a numpy array try: numpy_frame = ndarray( shape=(height, width, self._nb_channels), dtype=uint8 if self._img_depth == 8 else uint16, buffer=map_info.data) except TypeError: raise TypeError("Unexpected number of channels in the received image !\n" "You can try adding something like ' ! videoconvert ! " "video/x-raw,format=BGR ! ' before your sink to specify " "the format.\n(here BGR would be for 3 channels)") # Converting to gray level if needed if (self._user_pipeline is None and hasattr(self, 'channels') and self.channels == '1'): numpy_frame = cv2.cvtColor(numpy_frame, cv2.COLOR_BGR2GRAY) # Cleaning up the buffer mapping buffer.unmap(map_info) self._img = squeeze(numpy_frame) self._frame_nr += 1 return Gst.FlowReturn.OK def _set_brightness(self, brightness: float) -> None: """Sets the image brightness.""" self._restart_pipeline(self._get_pipeline(brightness=brightness)) def _set_contrast(self, contrast: float) -> None: """Sets the image contrast.""" self._restart_pipeline(self._get_pipeline(contrast=contrast)) def _set_hue(self, hue: float) -> None: """Sets the image hue.""" self._restart_pipeline(self._get_pipeline(hue=hue)) def _set_saturation(self, saturation: float) -> None: """Sets the image saturation.""" self._restart_pipeline(self._get_pipeline(saturation=saturation)) def _set_format(self, img_format) -> None: """Sets the image encoding and dimensions.""" self._restart_pipeline(self._get_pipeline(img_format=img_format)) # Reloading the software ROI selection settings if self._soft_roi_set and self._formats and ' ' in self._formats[0]: width, height = search(r'(\d+)x(\d+)', img_format).groups() self.reload_software_roi(int(width), int(height))