# coding: utf-8
from typing import Tuple, List, Any
import numpy as np
from time import time
import logging
from warnings import warn
from .meta_camera import Camera
from .._global import OptionalModule
try:
import usb.util
import usb.core
Seek_therm_usb_req = {'Write': usb.util.CTRL_OUT |
usb.util.CTRL_TYPE_VENDOR |
usb.util.CTRL_RECIPIENT_INTERFACE,
'Read': usb.util.CTRL_IN |
usb.util.CTRL_TYPE_VENDOR |
usb.util.CTRL_RECIPIENT_INTERFACE,
'Read_img': usb.util.CTRL_IN |
usb.util.CTRL_TYPE_STANDARD |
usb.util.CTRL_RECIPIENT_INTERFACE}
except (ModuleNotFoundError, ImportError):
usb = OptionalModule("usb")
Seek_thermal_pro_vendor = 0x289D
Seek_thermal_pro_product = 0x0011
Seek_thermal_pro_commands = {'Read chip id': 0x36,
'Start get image transfer': 0x53,
'Get operation mode': 0x3D,
'Get image processing mode': 0x3F,
'Get firmware info': 0x4E,
'Get factory settings': 0x58,
'Set operation mode': 0x3C,
'Set image processing mode': 0x3E,
'Set firmware info features': 0x55,
'Set factory settings features': 0x56}
Seek_thermal_pro_dimensions = {'Width': 320,
'Height': 240,
'Raw width': 342,
'Raw height': 260}
[docs]
class SeekThermalPro(Camera):
"""Class for reading images from the Seek Thermal Pro infrared camera.
The SeekThermalPro Camera is meant for reading images from a Seek
Thermal Pro infrared camera. It communicates over USB, and gets images by
converting the received bytearrays into :mod:`numpy` arrays.
Important:
**Only for Linux users:** In order to drive the Seek Thermal Pro, the
appropriate udev rule should be set. This can be done using the
`udev_rule_setter` utility in ``crappy``'s `util` folder. It is also
possible to add it manually by running:
::
$ echo "SUBSYSTEM==\\"usb\\", ATTR{idVendor}==\\"289d\\", \
MODE=\\"0777\\\"" | sudo tee seek_thermal.rules > /dev/null 2>&1
in a shell opened in ``/etc/udev/rules.d``.
.. versionadded:: 1.4.0
.. versionchanged:: 2.0.0 renamed from *Seek_thermal_pro* to *SeekThermalPro*
"""
[docs]
def __init__(self) -> None:
"""Selects the right USB device."""
warn(f"Starting from version 2.1.0, {type(self).__name__} will be moved "
f"to crappy.collection. Your code that uses it will still work as "
f"is, except you will now need to import crappy.collection at the "
f"top of your script.", FutureWarning)
super().__init__()
self._dev = None
self._calib = None
self._dead_pixels = []
# Listing all the matching USB devices
devices = usb.core.find(find_all=True,
idVendor=Seek_thermal_pro_vendor,
idProduct=Seek_thermal_pro_product)
devices = list(devices)
# Making sure there's exactly one possible camera to read images from
if len(devices) > 1:
raise IOError("Several matching cameras found, impossible to "
"differentiate between them")
elif len(devices) == 0:
raise IOError("No matching camera found")
else:
self._dev = devices[0]
[docs]
def open(self) -> None:
"""Sets the USB communication and initializes the device."""
# Setting the USB configuration on the camera
try:
self.log(logging.INFO, f"Setting configuration on USB device "
f"{self._dev}")
self._dev.set_configuration()
except usb.core.USBError:
self.log(logging.ERROR,
"An error occurred while setting the configuration of the USB"
" device !\nYou may have to install the udev-rules for this "
"USB device, this can be done using the udev_rule_setter "
"utility in the util folder")
raise
# Initializing the camera by sending various commands to it
self.log(logging.INFO, "Configuring the camera")
self._write_data(Seek_thermal_pro_commands['Set operation mode'],
b'\x00\x00')
self._write_data(
Seek_thermal_pro_commands['Set factory settings features'],
b'\x06\x00\x08\x00\x00\x00')
self._write_data(Seek_thermal_pro_commands['Set firmware info features'],
b'\x17\x00')
self._write_data(
Seek_thermal_pro_commands['Set factory settings features'],
b'\x01\x00\x00\x06\x00\x00')
for i in range(10):
for j in range(0, 256, 32):
self._write_data(
Seek_thermal_pro_commands['Set factory settings features'],
b'\x20\x00' + bytes([j, i]) + b'\x00\x00')
self._write_data(Seek_thermal_pro_commands['Set firmware info features'],
b'\x15\x00')
self._write_data(Seek_thermal_pro_commands['Set image processing mode'],
b'\x08\x00')
self._write_data(Seek_thermal_pro_commands['Set operation mode'],
b'\x01\x00')
# Acquiring the dead pixels image and saving the dead pixes map
self.log(logging.INFO, "Getting the dead pixels")
for i in range(5):
status, ret = self._grab()
if status == 4:
self._dead_pixels = self._get_dead_pixels_list(ret)
break
elif i == 4:
self.log(logging.WARNING, "Could not get the dead pixels frame")
# Acquiring the calibration image and calibrating the camera
self.log(logging.INFO, "Calibrating the camera")
for i in range(10):
status, img = self._grab()
if status == 1:
self._calib = self._crop(img) - 1600
break
elif i == 9:
raise TimeoutError("Could not set the camera")
[docs]
def get_image(self) -> Tuple[float, np.ndarray]:
"""Reads a single image from the camera.
Returns:
The captured image as well as a timestamp.
"""
count = 0
# Looping until a valid frame is acquired
while True:
# Capturing one frame
t = time()
status, img = self._grab()
# If a calibration frame is acquired, recalibrating
if status == 1:
self.log(logging.DEBUG, "Recalibrating the camera")
self._calib = self._crop(img) - 1600
# If a valid frame is acquired, returning it along with its metadata
elif status == 3 and self._calib is not None:
return t, self._correct_dead_pixels(self._crop(img)-self._calib)
# If no valid image can be read, that's bad news
elif count == 5:
raise TimeoutError("Unable to read image")
count += 1
[docs]
def close(self) -> None:
"""Resets the camera and releases the USB resources."""
if self._dev is not None:
for _ in range(3):
self._write_data(Seek_thermal_pro_commands['Set operation mode'],
b'\x00\x00')
self.log(logging.INFO, "Releasing the USB resources")
usb.util.dispose_resources(self._dev)
def _grab(self) -> [bytes, np.ndarray]:
"""Captures a raw image from the camera.
Returns:
The status information and the raw image.
"""
# Sending the read command
self._write_data(Seek_thermal_pro_commands['Start get image transfer'],
b'\x58\x5b\x01\x00')
to_read = 2 * \
Seek_thermal_pro_dimensions['Raw width'] * \
Seek_thermal_pro_dimensions['Raw height']
ret = bytearray()
# Reading all the chunks containing the frame information
while to_read - len(ret) > 512:
ret += self._dev.read(
endpoint=Seek_therm_usb_req['Read_img'],
size_or_buffer=int(to_read / (Seek_thermal_pro_dimensions['Raw height']
/ 20)),
timeout=1000)
# Returning the read image in the right format and the associated status
status = ret[4]
if len(ret) == to_read:
return status, np.frombuffer(ret, dtype=np.uint16).reshape(
Seek_thermal_pro_dimensions['Raw height'],
Seek_thermal_pro_dimensions['Raw width'])
else:
return status, None
def _get_dead_pixels_list(self, data: np.ndarray) -> List[Tuple[Any]]:
"""Identifies the dead pixels on an image.
Args:
data: The image to identify dead pixels on.
Returns:
A :obj:`list` containing the indexes of the dead pixels.
"""
img = self._crop(np.frombuffer(data, dtype=np.uint16).reshape(
Seek_thermal_pro_dimensions['Raw height'],
Seek_thermal_pro_dimensions['Raw width']))
return list(zip(*np.where(img < 100)))
@staticmethod
def _crop(raw_img: np.ndarray) -> np.ndarray:
"""Simply crops an image to the right dimensions."""
return raw_img[4: 4 + Seek_thermal_pro_dimensions['Height'],
1: 1 + Seek_thermal_pro_dimensions['Width']]
def _correct_dead_pixels(self, img: np.ndarray) -> np.ndarray:
"""Corrects the dead pixels values.
The new value is the average value of the surrounding pixels.
Args:
img: The image to correct.
Returns:
The corrected image.
"""
for i, j in self._dead_pixels:
img[i, j] = np.median(img[max(0, i - 1): i + 2, max(0, j - 1): j + 2])
return img
def _write_data(self, request: int, data: bytes) -> int:
"""Wrapper for sending USB messages."""
self.log(logging.DEBUG, f"Sending USB command with request type "
f"{Seek_therm_usb_req['Write']}, request "
f"{request}, value {0}, index {0},length or "
f"data {data}")
try:
return self._dev.ctrl_transfer(bmRequestType=Seek_therm_usb_req['Write'],
bRequest=request,
wValue=0,
wIndex=0,
data_or_wLength=data,
timeout=None)
except usb.core.USBError:
raise IOError("An error occurred during USB communication")
def _read_data(self, request: int, data: bytes) -> int:
"""Wrapper for reading USB messages."""
self.log(logging.DEBUG, f"Sending USB command with request type "
f"{Seek_therm_usb_req['Read']}, request {request},"
f" value {0}, index {0},length or data {data}")
try:
return self._dev.ctrl_transfer(bmRequestType=Seek_therm_usb_req['Read'],
bRequest=request,
wValue=0,
wIndex=0,
data_or_wLength=data,
timeout=None)
except usb.core.USBError:
raise IOError("An error occurred during USB communication")