Source code for crappy.blocks.ucontroller

# coding: utf-8

from struct import unpack
from time import time
from typing import Optional, Dict, Callable, Iterable, Union
import logging

from .meta_block import Block
from .._global import OptionalModule

try:
  from serial import Serial
  from serial.serialutil import SerialException
except (ModuleNotFoundError, ImportError):
  Serial = OptionalModule("pyserial")
  SerialException = OptionalModule("pyserial")


[docs] class UController(Block): """Block for interfacing over serial with an external device, written mostly for communication with microcontrollers. It can send labeled commands to the device, and/or receive labeled data from it. This Block is meant to be used along with the `microcontroller.py` MicroPython template located in the `tool` folder of Crappy, even though it is not mandatory. A given syntax needs to be followed for any data to be exchanged. .. versionadded:: 1.5.8 """
[docs] def __init__(self, labels: Optional[Union[str, Iterable[str]]] = None, cmd_labels: Optional[Union[str, Iterable[str]]] = None, init_output: Optional[Dict[str, float]] = None, post_process: Optional[Dict[str, Callable[[float], float]]] = None, t_device: bool = False, port: str = '/dev/ttyUSB0', baudrate: int = 115200, display_freq: bool = False, freq: Optional[float] = 100, debug: Optional[bool] = False) -> None: """Checks the validity of the arguments. Args: labels: An iterable (like a :obj:`list` or a :obj:`tuple`) containing the labels to get from the device (as :obj:`str`). Only these labels should be given as argument to the :meth:`send_to_pc` method in the MicroPython script. If this argument is not :obj:`None`, then the ``init_output`` argument should be given as well. No more than 9 labels should be given. If there's only one label to acquire, it can be given directly as a :obj:`str` and not in an iterable. cmd_labels: An iterable (like a :obj:`list` or a :obj:`tuple`) containing the command labels that will be sent to the device upon reception from an upstream Block. The variables in the MicroPython script should have these exact names. Not more than 9 cmd_labels should be given. If there's only one command label, it can be given directly as a :obj:`str` and not in an iterable. init_output: If the ``labels`` argument is not :obj:`None`, the values to output to downstream Blocks for each label as long as no value has been received from the device. An initial output value must be given for each label. post_process: Optionally allows applying a function to the data of a label before transmitting it to downstream Blocks. It is possible to give functions for only part of the labels. t_device: It :obj:`True`, the timestamp returned under the label `'t(s)'` is the one of the device, not the one of Crappy. It may reduce the maximum achievable sample rate, as more bytes have to be transmitted, but it is also far more precise. port: The serial port to open for communicating with the device. In Windows, they are usually called `COMx`, whereas in Linux and Mac they're called `/dev/ttyxxxx`. baudrate: The baudrate for serial communication. It depends on the capabilities of the device. display_freq: If :obj:`True`, displays the looping frequency of the Block. .. versionchanged:: 2.0.0 renamed from *verbose* to *display_freq* freq: The target looping frequency for the Block. If :obj:`None`, loops as fast as possible. debug: If :obj:`True`, displays all the log messages including the :obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log messages with :obj:`~logging.INFO` level or higher. If :obj:`None`, disables logging for this Block. .. versionadded:: 2.0.0 """ self._bus = None super().__init__() self.debug = debug if not isinstance(display_freq, bool): raise TypeError("display_freq should be either True or False !") self.display_freq = display_freq if not isinstance(freq, float) and not isinstance(freq, int) or freq <= 0: raise TypeError("freq should be a positive float !") self.freq = freq if not isinstance(t_device, bool): raise TypeError("t_device should be either True or False !") self._t_device = t_device if not isinstance(port, str): raise TypeError("port should be a string !") self._port = port if not isinstance(baudrate, int) or baudrate < 0: raise ValueError("baudrate should be a positive integer !") self._baudrate = baudrate # Forcing the labels into a list if labels is not None and isinstance(labels, str): self._labels = [labels] elif labels is not None: self._labels = list(labels) else: self._labels = None # Forcing the cmd_labels into a list if cmd_labels is not None and isinstance(cmd_labels, str): self._cmd_labels = [cmd_labels] elif cmd_labels is not None: self._cmd_labels = list(cmd_labels) else: self._cmd_labels = None if self._labels is not None and len(self._labels) > 9: raise ValueError("Sorry, a maximum of 9 labels is allowed !") if self._cmd_labels is not None and len(self._cmd_labels) > 9: raise ValueError("Sorry, a maximum of 9 cmd_labels is allowed !") if self._cmd_labels is not None: self._prev_cmd = {cmd_label: None for cmd_label in self._cmd_labels} if init_output is not None and not isinstance(init_output, dict): raise TypeError("init_output should be a dict !") if self._labels is not None and not all( label in (init_output if init_output is not None else dict()) for label in self._labels): raise ValueError("Every label should have an init_output value !") self._out = init_output if post_process is not None and (not isinstance(post_process, dict) or not all(callable(func) for func in post_process.values())): raise TypeError("post_process should be a dict of callables !") self._post_process = post_process if post_process is not None else {} self._buffer = None self._cmd_table = None self._labels_table = None
[docs] def prepare(self) -> None: """Opens the serial port, and sends a `'go'` message to the device. Also shares with the device two tables associating each `cmd_label` and `label` with an integer. This allows reducing the traffic on the serial bus. Note: The commands are sent as text because some boards cannot read bytes from the `stdin` buffer in MicroPython. """ # Checking if the link layout is relevant with respect to the arguments if self._labels is not None and not self.outputs: raise IOError("labels are specified but there's no output link !") if self._cmd_labels is not None and not self.inputs: raise IOError("cmd_labels are specified but there's no input link !") # Buffer for storing the received bytes if self._labels is not None: self._buffer = b'' # Opening the serial port try: self.log(logging.INFO, f"Opening the serial port {self._port} with " f"baudrate {self._baudrate}") self._bus = Serial(self._port, self._baudrate, timeout=0, write_timeout=0) except SerialException: raise IOError(f"Couldn't connect to the device on the port {self._port}") # Assigning indexes to the cmd_labels and labels, to identify them easily # and reduce the traffic on the bus if self._cmd_labels is not None: self._cmd_table = {label: i for i, label in enumerate(self._cmd_labels, start=1)} else: self._cmd_table = dict() self.log(logging.DEBUG, f"Command table : {self._cmd_table}") if self._labels is not None: self._labels_table = {label: i for i, label in enumerate(self._labels, start=1)} else: self._labels_table = dict() # The presence of the label 't(s)' indicates that the device should return # a timestamp along with the data if self._labels is not None and self._t_device: self._labels_table.update({'t(s)': 0}) self.log(logging.DEBUG, f"Labels table : {self._labels_table}") # Emptying the read buffer before starting try: self._bus.reset_input_buffer() self._bus.reset_output_buffer() except SerialException: raise IOError(f"Reading from the device on port {self._port} failed, " f"it may have been disconnected.") # Sending the 'go' command to start the device self.log(logging.INFO, f"Sending start command on port {self._port}") try: msg = b''.join((b'go', str(len(self._cmd_table)).encode(), str(len(self._labels_table)).encode(), b'\r\n')) self._bus.write(msg) self.log(logging.DEBUG, f"Sent {msg} on the port {self._port}") except SerialException: raise IOError(f"Writing to the device on port {self._port} failed, " f"it may have been disconnected.") # Sending the table of cmd_labels and their indexes self.log(logging.INFO, f"Sending the command labels table on port " f"{self._port}") for cmd, i in self._cmd_table.items(): try: msg = b''.join((str(i).encode(), cmd.encode(), b'\r\n')) self._bus.write(msg) self.log(logging.DEBUG, f"Sent {msg} on the port {self._port}") except SerialException: raise IOError(f"Writing to the device on port {self._port} failed, " f"it may have been disconnected.") # Sending the table of labels and their indexes self.log(logging.INFO, f"Sending the labels table on port {self._port}") for label, i in self._labels_table.items(): try: msg = b''.join((str(i).encode(), label.encode(), b'\r\n')) self._bus.write(msg) self.log(logging.DEBUG, f"Sent {msg} on the port {self._port}") except SerialException: raise IOError(f"Writing to the device on port {self._port} failed, " f"it may have been disconnected.")
[docs] def loop(self) -> None: """First sends the commands from upstream Blocks to the device, then reads the data from the device and sends it to the downstream Blocks. Important: The precision of the commands sent to the device is limited to 3 digits after the decimal point, to limit the traffic on the bus. Adapt the range of the command values consequently. Note: Commands are sent as text, because some boards cannot read bytes from the `stdin` buffer in MicroPython. Data is however received on the PC from the device as bytes. """ """Loop for sending the commands to the device. First, for each label in the cmd_labels list we search for a matching command in the upstream links. Only the first one found is considered. If the command value is different from the last value of this label, then the command is sent to the device. Otherwise it is ignored. The command value is then stored as the last value of the label. """ if self._cmd_labels is not None: cmd = self.recv_last_data() for label in self._cmd_labels: if label in cmd and cmd[label] != self._prev_cmd[label]: msg = b''.join((f'{self._cmd_table[label]}' f'{cmd[label]:.3f}'.encode(), b'\r\n')) # Information for debugging self.log(logging.DEBUG, f"Sending {msg} on the port {self._port}") # Sending the actual message to the device try: self._bus.write(msg) except SerialException: raise IOError(f"Writing to the device on port {self._port} " f"failed, it may have been disconnected.") self._prev_cmd[label] = cmd[label] """Loop for receiving data from the device. A given number of bytes is read from the device, depending if `t_device` is True or False. they are then parsed to extract the index of the label, the data, and optionally the timestamp. If a label with a matching index is present in labels, its value is updated, as well as the timestamp. The values of ALL the labels are then sent to downstream blocks. """ if self._labels is not None: # Reading the message from the device retries = 3 while retries: try: recv = self._bus.read() except SerialException: raise IOError(f"Reading from the device on port {self._port} " f"failed, it may have been disconnected.") # This prevents the loop from staying stuck with an incomplete reading if not recv: retries -= 1 continue self._buffer += recv # Exiting the loop when the desired number of bytes is reached if (self._t_device and len(self._buffer) == 9) or \ (not self._t_device and len(self._buffer) == 5): break # There was no message from the device, or it was incomplete if (self._t_device and len(self._buffer) != 9) or \ (not self._t_device and len(self._buffer) != 5): return # Information for debugging self.log(logging.DEBUG, f"Received {self._buffer} on the port " f"{self._port}") # Parsing the received bytes read = unpack('<ibf' if self._t_device else '<bf', self._buffer) self._buffer = b'' # Reading the timestamp if relevant if self._t_device: self._out['t(s)'] = read[0] / 1000 read = read[1:] self.log(logging.DEBUG, f"Read value {read} from the device") # Updating the label value and sending to the downstream blocks for label in self._labels: if read[0] == self._labels_table[label]: value = read[1] self._out[label] = self._post_process[label](value) if \ label in self._post_process else value if not self._t_device: self._out['t(s)'] = time() - self.t0 self.send(self._out) break # Emptying the port buffer even if the messages are not processed else: try: self._bus.reset_input_buffer() except SerialException: raise IOError(f"Reading from the device on port {self._port} " f"failed, it may have been disconnected.")
[docs] def finish(self) -> None: """Closes the serial port, and sends a `'stop!'` message to the device.""" if self._bus is not None: # Sending a 'stop!' message to the device self.log(logging.INFO, f"Sending stop command on port {self._port}") try: msg = b'stop!\r\n' self._bus.write(msg) self.log(logging.DEBUG, f"Sent {msg} on the port {self._port}") except SerialException: pass self.log(logging.INFO, "Closing the serial connection") self._bus.close()