# coding: utf-8
from typing import Union, Optional, Iterable, Any
import logging
from .meta_block import Block
from ..inout import inout_dict, InOut, deprecated_inouts
from ..tool.ft232h import USBServer
[docs]
class IOBlock(Block):
"""This Block is meant to drive :class:`~crappy.inout.InOut` objects. It can
acquire data, and/or set commands. One IOBlock can only drive a single InOut.
If it has incoming :class:`~crappy.links.Link`, it will set the commands
received over the labels given in ``cmd_labels`` by calling the
:meth:`~crappy.inout.InOut.set_cmd` method of the InOut. Additional commands
to set at the very beginning or the very end of the test can also be
specified.
If it has outgoing :class:`~crappy.links.Link`, it will acquire data using
the :meth:`~crappy.inout.InOut.get_data` method of the InOut and send it
downstream over the labels given in ``labels``. It is possible to trigger the
acquisition using a predefined label.
The ``streamer`` argument allows using the "streamer" mode of InOuts
supporting it, instead of the regular acquisition mode. Finally, the
``make_zero_delay`` argument allows offsetting the acquired values to zero at
the beginning of the test. Refer to the documentation of each argument for a
more detailed description.
.. versionadded:: 1.4.0
"""
[docs]
def __init__(self,
name: str,
labels: Optional[Union[str, Iterable[str]]] = None,
cmd_labels: Optional[Union[str, Iterable[str]]] = None,
trigger_label: Optional[str] = None,
streamer: bool = False,
initial_cmd: Optional[Union[Any, Iterable[Any]]] = None,
exit_cmd: Optional[Union[Any, Iterable[Any]]] = None,
make_zero_delay: Optional[float] = None,
ft232h_ser_num: Optional[str] = None,
spam: bool = False,
freq: Optional[float] = 200,
display_freq: bool = False,
debug: Optional[bool] = False,
**kwargs) -> None:
"""Sets the arguments and initializes the parent class.
Args:
name: The name of the :class:`~crappy.inout.InOut` class to instantiate.
labels: An iterable (e.g. a :obj:`list` or a :obj:`tuple`) containing the
output labels for InOuts that acquire data. They correspond to the
values returned by the InOut's :meth:`~crappy.inout.InOut.get_data`
method, so there should be as many labels as returned values, and given
in the appropriate order. The first label must always be the time
label, preferably called ``'t(s)'``. This argument can be omitted if
:meth:`~crappy.inout.InOut.get_data` returns a :obj:`dict`. Ignored if
the Block has no output Link.
cmd_labels: An iterable (e.g. a :obj:`list` or a :obj:`tuple`) containing
the labels considered as inputs of this Block, for InOuts that set
commands. The values received from these labels will be passed to the
InOut's :meth:`~crappy.inout.InOut.set_cmd` method, in the same order
as the labels are given. Usually, time is not part of the
``cmd_labels``. Ignored if the Block has no input Link.
trigger_label: If given, the Block will only read data whenever a value
is received on this label (can be any value). Ignored if the Block has
no output Link. A trigger label can also be a cmd label.
.. versionchanged:: 1.5.10 renamed from *trigger* to *trigger_label*
streamer: If :obj:`False`, the :meth:`~crappy.inout.InOut.get_data`
method of the InOut is called for acquiring data, else it is the
:meth:`~crappy.inout.InOut.get_stream` method. Refer to the
documentation of these methods for more information.
initial_cmd: An initial command for the InOut, set during
:meth:`prepare`. If given, there must be as many values as in
``cmd_labels``. Must be given as an iterable (e.g. a :obj:`list` or a
:obj:`tuple`).
exit_cmd: A final command for the InOut, set during :meth:`finish`. If
given, there must be as many values as in ``cmd_labels``. Must be given
as an iterable (e.g. a :obj:`list` or a :obj:`tuple`).
.. versionchanged:: 1.5.10 renamed from *exit_values* to *exit_cmd*
make_zero_delay: If set, will acquire data before the beginning of the
test and use it to offset all the labels to zero. The data will be
acquired during the given number of seconds. Ignored if the Block has
no output Links. Does not work for InOuts that acquire values other
than numbers (:obj:`str` for example).
.. versionadded:: 1.5.10
spam: If :obj:`False`, the Block will call
:meth:`~crappy.inout.InOut.set_cmd` on the InOut object only if the
current command is different from the previous. Otherwise, it will call
the method each time a command is received.
freq: The target looping frequency for the Block. If :obj:`None`, loops
as fast as possible.
display_freq: If :obj:`True`, displays the looping frequency of the
Block while running.
.. versionchanged:: 2.0.0 renamed from *verbose* to *display_freq*
debug: If :obj:`True`, displays all the log messages including the
:obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log
messages with :obj:`~logging.INFO` level or higher. If :obj:`None`,
disables logging for this Block.
.. versionadded:: 2.0.0
**kwargs: The arguments to be passed to the :class:`~crappy.inout.InOut`.
"""
self._device: Optional[InOut] = None
self._ft232h_args = None
self._read: bool = False
self._write: bool = False
super().__init__()
self.niceness = -10
self.freq = freq
self.display_freq = display_freq
self.debug = debug
# The label argument can be omitted for streaming
if labels is None and streamer:
self.labels = ['t(s)', 'stream']
# Forcing the labels into a list
elif labels is not None and isinstance(labels, str):
self.labels = [labels]
elif labels is not None:
self.labels = list(labels)
else:
self.labels = None
# Forcing the cmd_labels into a list or None
if cmd_labels is not None and isinstance(cmd_labels, str):
self._cmd_labels = [cmd_labels]
elif cmd_labels is not None:
self._cmd_labels = list(cmd_labels)
else:
self._cmd_labels = None
# Forcing the initial_cmd into a list
if initial_cmd is not None and isinstance(initial_cmd, str):
self._initial_cmd = [initial_cmd]
elif initial_cmd is not None:
self._initial_cmd = list(initial_cmd)
else:
self._initial_cmd = None
# Forcing the exit_cmd into a list
if exit_cmd is not None and isinstance(exit_cmd, str):
self._exit_cmd = [exit_cmd]
elif exit_cmd is not None:
self._exit_cmd = list(exit_cmd)
else:
self._exit_cmd = None
# Checking that the initial_cmd and exit_cmd length are consistent
if self._cmd_labels is not None:
if self._initial_cmd is not None \
and len(self._initial_cmd) != len(self._cmd_labels):
raise ValueError("There should be as many values in initial_cmd as "
"there are in cmd_labels !")
if self._exit_cmd is not None \
and len(self._exit_cmd) != len(self._cmd_labels):
raise ValueError("There should be as many values in exit_cmd as "
"there are in cmd_labels !")
self._trig_label = trigger_label
# Checking for deprecated names
if name in deprecated_inouts:
raise NotImplementedError(
f"The {name} InOut was deprecated in version 2.0.0, and renamed "
f"to {deprecated_inouts[name]} ! Please update your code "
f"accordingly and check the documentation for more information")
# Checking that all the given actuators are valid
if name not in inout_dict:
possible = ', '.join(sorted(inout_dict.keys()))
raise ValueError(f"Unknown InOut type : {name} ! "
f"The possible types are : {possible}")
self._io_name = name
self._inout_kwargs = kwargs
self._streamer = streamer
self._spam = spam
self._make_zero_delay = make_zero_delay
self._stream_started = False
self._last_cmd = None
self._prev_values = dict()
# Checking whether the InOut communicates through an FT232H
if inout_dict[self._io_name].ft232h:
self._ft232h_args = USBServer.register(ft232h_ser_num)
[docs]
def prepare(self) -> None:
"""Checks the consistency of the Link layout, opens the InOut and sets the
initial command if required.
This method mainly calls the :meth:`~crappy.inout.InOut.open` method of the
driven InOut.
"""
# Instantiating the device in a regular way
if self._ft232h_args is None:
self._device = inout_dict[self._io_name](**self._inout_kwargs)
# Instantiating the device and the connection to the FT232H
else:
self.log(logging.INFO, "The InOut to open communicates over an FT232H")
self._device = inout_dict[self._io_name](**self._inout_kwargs,
_ft232h_args=self._ft232h_args)
# Checking that the block has inputs or outputs
if not self.inputs and not self.outputs:
raise IOError('Error ! The IOBlock is neither an input nor an output !')
# cmd_labels must be defined when the block has inputs
if self.inputs and self._cmd_labels is None and self._trig_label is None:
raise ValueError('Error ! The IOBlock has incoming links but no '
'cmd_labels have been given !')
self._read = bool(self.outputs)
self._write = bool(self._cmd_labels)
# Now opening the device
self.log(logging.INFO, f"Opening the {type(self._device).__name__} InOut")
self._device.open()
self.log(logging.INFO, f"{type(self._device).__name__} InOut opened")
# Acquiring data for offsetting the output
if self._read and self._make_zero_delay is not None:
self.log(logging.INFO, f"Performing offsetting on the "
f"{type(self._device).__name__} InOut")
self._device.make_zero(self._make_zero_delay)
# Writing the first command before the beginning of the test if required
if self._write and self._initial_cmd is not None:
self.log(logging.INFO, f"Sending the initial command to the "
f"{type(self._device).__name__} InOut")
self._device.set_cmd(*self._initial_cmd)
self._last_cmd = self._initial_cmd
self._prev_values.update(zip(self._cmd_labels, self._initial_cmd))
[docs]
def loop(self) -> None:
"""Reads data from the InOut and/or sets the received commands.
Data is read from the InOut **only** if this Block has outgoing Links. If
the ``trigger_label`` is given, data is read only if a trigger is received
over the given trigger label.
A command is set on the InOut **only** if this Block has incoming Links,
and if data is received over these Links. Depending on the value of the
``spam`` argument, a command might not be set if it is similar to the
previous one.
The data is read from the InOut either by calling its
:meth:`~crappy.inout.InOut.return_data` or its
:meth:`~crappy.inout.InOut.return_stream` method, depending if the
``streamer`` argument is :obj:`True` of :obj:`False`. The commands are
always set by calling the :meth:`~crappy.inout.InOut.set_cmd` method.
"""
# Receiving all the latest data waiting in the links
data = self.recv_last_data(fill_missing=False)
# Reading data from the device if there's no trig_label or if data has been
# received on this trig_label
if self._read:
if self._trig_label is None:
self._read_data()
elif self._trig_label in data:
self.log(logging.DEBUG, "Software trigger signal received")
self._read_data()
# If no data was received, there's nothing to write
if not data:
return
if self._write:
# The missing values are completed here, because the trig label must not
# be artificially created
self._prev_values.update(data)
data.update(self._prev_values)
# Keeping only the labels in cmd_labels
data = {key: val for key, val in data.items() if key in self._cmd_labels}
# If not all cmd_labels have a value, returning without calling set_cmd
if len(data) != len(self._cmd_labels):
self.log(logging.WARNING, f"Not enough values received in the "
f"{type(self._device).__name__} InOut to"
f" set the cmd, cmd not set !")
return
# Grouping the command values in a list before passing them to set_cmd
cmd = [data[label] for label in self._cmd_labels]
# Setting the command if it's different from the previous or spam is True
if cmd != self._last_cmd or self._spam:
self.log(logging.DEBUG, f"Writing the command {cmd} to the "
f"{type(self._device).__name__} InOut")
self._device.set_cmd(*cmd)
self._last_cmd = cmd
[docs]
def finish(self) -> None:
"""Stops the stream, sets the exit command if necessary, and closes the
InOut.
This method mainly calls the :meth:`~crappy.inout.InOut.close` method of
the driven InOut.
"""
# Stopping the stream
if self._streamer and self._device is not None:
self.log(logging.INFO, f"Stopping stream on the "
f"{type(self._device).__name__} InOut")
self._device.stop_stream()
# Setting the exit command
if self._write and self._exit_cmd is not None and self._device is not None:
self.log(logging.INFO, f"Sending the exit command to the "
f"{type(self._device).__name__} InOut")
self._device.set_cmd(*self._exit_cmd)
# Closing the device
if self._device is not None:
self.log(logging.INFO, f"Closing the {type(self._device).__name__} "
f"InOut")
self._device.close()
self.log(logging.INFO, f"{type(self._device).__name__} InOut closed")
def _read_data(self) -> None:
"""Reads the data or the stream, offsets the timestamp and sends the data
to downstream Blocks."""
if self._streamer:
# Starting the stream if needed
if not self._stream_started:
self.log(logging.INFO, f"Starting stream on the "
f"{type(self._device).__name__} InOut")
self._device.start_stream()
self._stream_started = True
# Actually getting the stream
data = self._device.return_stream()
else:
# Regular reading of data
data = self._device.return_data()
self.log(logging.DEBUG, f"Read values {data} from the "
f"{type(self._device).__name__} InOut")
if data is None:
return
# Making time relative to the beginning of the test
if isinstance(data, dict) and 't(s)' in data:
data['t(s)'] -= self.t0
else:
data[0] -= self.t0
self.send(data)