# coding: utf-8
from typing import Dict, Union, Any, Optional, Iterable, List, Tuple
from time import time, sleep
from subprocess import Popen, PIPE, STDOUT, TimeoutExpired
from threading import Thread
from queue import Queue, Empty
from ast import literal_eval
from pickle import loads, dumps, UnpicklingError
from socket import timeout, gaierror
from itertools import chain
import logging
from .meta_block import Block
from .._global import OptionalModule
try:
import paho.mqtt.client as mqtt
except (ModuleNotFoundError, ImportError):
mqtt = OptionalModule("paho-mqtt")
TopicsType = Iterable[Union[str, Iterable[str]]]
[docs]
class ClientServer(Block):
"""This Block can exchange data on a local network using the MQTT protocol.
It communicates with an MQTT broker, from which it can receive data by
subscribing to topics, and to which it can send data by publishing in topics.
This Block can also manage the execution of a Mosquitto broker, so that the
broker doesn't need to be manually started before running the Crappy script.
This Block is intended for communication with remote devices, that may run
Crappy scripts or other scripts handling data with the correct syntax.
Potential uses include acquisition from battery-powered devices (e.g.
microcontrollers) acquiring data in enclosed areas or rotating parts, data
transfer between machines to delegate processing, communication with other
programs on a same machine, etc.
.. versionadded:: 1.4.0
.. versionchanged:: 2.0.0 renamed from *Client_server* to *ClientServer*
"""
[docs]
def __init__(self,
broker: bool = False,
address: str = 'localhost',
port: int = 1883,
init_output: Optional[Dict[str, Any]] = None,
topics: Optional[TopicsType] = None,
cmd_labels: Optional[TopicsType] = None,
labels_to_send: Optional[TopicsType] = None,
display_freq: bool = False,
freq: Optional[float] = 200,
spam: bool = False,
debug: Optional[bool] = False) -> None:
"""Checks the validity of the arguments and sets the instance attributes.
Args:
broker: If :obj:`True`, starts the Mosquitto broker during the prepare
loop and stops it during the finish loop. If Mosquitto is not installed
a :exc:`FileNotFoundError` is raised.
address: The network address on which the MQTT broker is running, as a
:obj:`str`.
port: A network port on which the MQTT broker is listening, as an
:obj:`int`.
init_output: A :obj:`dict` containing for each label in ``topics`` the
first value to be sent in the output Links. Should be given in case the
data comes from several sources and data for all labels may not be
available during the first loops. Must also be given is ``spam`` is set
to :obj:`True`.
topics: An iterable (like a :obj:`list` or a :obj:`tuple`) containing
:obj:`str` and/or iterables of :obj:`str`. Each string corresponds to
the name of a label in Crappy. Each element in the iterable (string or
iterable of strings) is considered to be the name of an MQTT topic, to
which the client subscribes. After a message has been received on that
topic, the Block returns for each label in the topic (just the given
string or each string in the iterable) the corresponding data from the
message. It also returns the current timestamp in the label `'t(s)'`.
cmd_labels: An iterable (like a :obj:`list` or a :obj:`tuple`) containing
:obj:`str` and/or iterables of :obj:`str`. Each string corresponds to
the name of a label in Crappy. Each element in the iterable (string or
iterable of strings) is considered to be the name of an MQTT topic, in
which the client publishes. Grouping labels in a same topic (i.e.
strings in a same iterable) allows to keep the synchronization between
signals coming from a same Block, as they will be published together in
a same message. This is mostly useful for sending a signal along with
its timeframe.
labels_to_send: An iterable (like a :obj:`list` or a :obj:`tuple`)
containing :obj:`str` and/or iterables of :obj:`str`. Allows to rename
the labels before publishing data. The structure of ``labels_to_send``
should be the exact same as ``cmd_labels``, with each label in
``labels_to_send`` replacing the corresponding one in ``cmd_labels``.
This is especially useful for transferring several signals along with
their timestamps, as the label ``'t(s)'`` should not appear more than
once in the topics.
display_freq: If :obj:`True`, displays the looping frequency of the
Block.
.. versionadded:: 1.5.10
.. versionchanged:: 2.0.0 renamed from *verbose* to *display_freq*
freq: The target looping frequency for the Block. If :obj:`None`, loops
as fast as possible.
.. versionadded:: 1.5.10
spam: If :obj:`True`, sends the last received values at each loop even if
no new values were received from the broker. When set to :obj:`True`,
the ``init_output`` must be provided.
.. versionadded:: 1.5.10
debug: If :obj:`True`, displays all the log messages including the
:obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log
messages with :obj:`~logging.INFO` level or higher. If :obj:`None`,
disables logging for this Block.
.. versionadded:: 2.0.0
Note:
- ``broker``:
In order for the Block to run, an MQTT broker must be running at the
specified address on the specified port. If not, an
:exc:`ConnectionRefusedError` is raised. The broker can be started and
stopped manually by the user independently of the execution of Crappy.
It also doesn't need to be Mosquitto, any other MQTT broker can be
used.
- ``topics``:
The presence of the same label in multiple topics will most likely lead
to a data loss.
- ``cmd_labels``:
It is not possible to group signals coming from different Blocks in a
same topic.
- ``labels_to_send``:
Differences in the structure of ``labels_to_send`` and ``cmd_labels``
will not always raise an error, but may lead to a data loss.
- **Single-value iterables**:
Single-value iterables can be shortened as strings.
::
topics=[('cmd1',), ('cmd2',)]
cmd_labels=[('cmd1',), ('cmd2',)]
labels_to_send=[('cmd1',), ('cmd2',)]
is equivalent to
::
topics=['cmd1', 'cmd2']
cmd_labels=['cmd1', 'cmd2']
labels_to_send=['cmd1', 'cmd2']
Examples:
- ``topics``:
If
::
topics=[('t1', 'cmd1'), 'sign']
the client will subscribe to the topics
::
('t1', 'cmd1')
('sign',)
The Block will return data associated with the labels
::
't1', 'cmd1'
'sign'
- ``cmd_labels``:
If
::
cmd_labels=[('t1', 'cmd1'), 'sign']
the client will publish data in the form of
::
[[t1_0, cmd1_0], [t1_1, cmd1_1], ...]
[[sign_0], [sign_1], ...]
in the topics
::
('t1', 'cmd1')
('sign',)
- ``labels_to_send``:
If
::
cmd_labels=[('t(s)', 'cmd'), 'sign']
labels_to_send=[('t1', 'cmd1'), 'sign']
the data from labels
::
't(s)', 'cmd'
will be published in the topic
::
('t1', 'cmd1')
and the data from label
::
'sign'
in the topic
::
('sign',)
"""
self._client: Optional[mqtt.Client] = None
self._reader: Optional[Thread] = None
self._proc: Optional[Popen] = None
super().__init__()
self.niceness = -10
self.display_freq = display_freq
self.freq = freq
self.debug = debug
# Setting the args
self._broker = broker
self._address = address
self._port = port
self._spam = spam
self._init_output = init_output if init_output is not None else dict()
self._stop_mosquitto = False
# These attributes may be set later
self._topics: Optional[List[Tuple[str, ...]]] = None
self._last_out_val: Dict[str, Any] = dict()
self._buffer_output: Optional[Dict[Tuple[str, ...], Queue]] = None
self._cmd_labels: Optional[List[Tuple[str, ...]]] = None
self._labels_to_send: Optional[List[Tuple[str, ...]]] = None
if topics is None and cmd_labels is None:
self.log(logging.WARNING, "The Client-server Block is neither an input "
"nor an output !")
# Preparing for receiving data
if topics is not None:
# Replacing strings with tuples
self._topics = [(topic,) if isinstance(topic, str) else tuple(topic) for
topic in topics]
# The last out vals are given for each label, not each topic
self._last_out_val = {label: None for label in chain(*self._topics)}
# Preparing for publishing data
if cmd_labels is not None:
# Replacing strings with tuples
self._cmd_labels = [(topic,) if isinstance(topic, str) else tuple(topic)
for topic in cmd_labels]
if labels_to_send is not None:
# Replacing strings with tuples
labels_to_send = [(topic,) if isinstance(topic, str) else tuple(topic)
for topic in labels_to_send]
# Making sure the labels to send have the correct syntax
if len(labels_to_send) != len(self._cmd_labels):
raise ValueError("Either a label_to_send should be given for "
"every cmd_label, or none should be given ")
# Preparing to rename labels to send using a dictionary
self._labels_to_send = {cmd_label: label_to_send for
cmd_label, label_to_send in
zip(self._cmd_labels, labels_to_send)}
[docs]
def prepare(self) -> None:
"""Starts the broker and connects to it."""
# Making sure the necessary inputs and outputs are present
if self._topics is not None and not self.outputs:
raise ValueError("topics are specified but there's no output link !")
if self._cmd_labels is not None and not self.inputs:
raise ValueError("cmd_labels are specified but there's no input link !")
# Setting the buffer here because Queue objects cannot be set during
# __init__ in spawn multiprocessing mode
if self._topics is not None:
# The buffer for received data is a dictionary of queues
self._buffer_output = {topic: Queue() for topic in self._topics}
# Starting the broker
if self._broker:
self.log(logging.INFO, f"Starting the Mosquitto broker on port "
f"{self._port}")
self._launch_mosquitto()
# Creating and starting a Thread reading the stdout of the broker
self._reader = Thread(target=self._output_reader)
self._reader.start()
sleep(2)
self.log(logging.INFO, "Waiting for Mosquitto to start")
sleep(2)
# Instantiating the client here as it cannot be set during __init__ in
# spawn multiprocessing mode
self._client = mqtt.Client(str(time()))
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._client.reconnect_delay_set(max_delay=10)
# Connecting to the broker
try_count = 15
while True:
try:
self.log(logging.INFO, f"Connecting to the broker at address "
f"{self._address} on port {self._port}")
self._client.connect(self._address, port=self._port, keepalive=10)
break
except timeout:
raise TimeoutError("Impossible to reach the given address, aborting")
except gaierror:
raise ValueError("Invalid address given, please check the spelling")
except ConnectionRefusedError:
try_count -= 1
if try_count == 0:
raise ConnectionRefusedError("Connection refused, the broker may not"
" be running or you may not have the "
"rights to connect")
sleep(1)
self.log(logging.INFO, "Starting the client loop")
self._client.loop_start()
[docs]
def loop(self) -> None:
"""Receives data from the broker and/or sends data to the broker.
The received data is then sent to the crappy Blocks connected to this one.
"""
"""Loop for receiving data
Each queue in the buffer is checked once: if not empty then the first list
of data is popped. This data is then associated to the corresponding
labels in dict_out. dict_out is finally returned if not empty. All the
labels should be returned at each loop iteration, so a buffer stores the
last value for each label and returns it if no other value was received. In
case no value was received yet for a given label, the user can also provide
init values to be sent at the beginning."""
if self._topics is not None:
dict_out = dict()
for topic in self._buffer_output:
if not self._buffer_output[topic].empty():
try:
data_list = self._buffer_output[topic].get_nowait()
for label, data in zip(topic, data_list):
dict_out[label] = data
except Empty:
pass
# Updating the _last_out_val buffer, and completing dict_out before
# sending data if necessary
if dict_out or self._spam:
for topic in self._buffer_output:
for label in topic:
if label not in dict_out:
if self._last_out_val[label] is not None:
dict_out[label] = self._last_out_val[label]
elif label in self._init_output:
dict_out[label] = self._init_output[label]
else:
raise ValueError(f"No value received for the topic {label} and"
f" no init value given !")
else:
self._last_out_val[label] = dict_out[label]
# Adding the timestamp before sending
dict_out['t(s)'] = time() - self.t0
self.send(dict_out)
"""Loop for sending data
Data is first received as a list of dictionaries. For each topic, trying to
find a dictionary containing all the corresponding labels. Once this
dictionary has been found, its data is published as a list of list of
values."""
if self._cmd_labels is not None:
received_data = self.recv_all_data_raw()
for topic in self._cmd_labels:
for dic in received_data:
if dic is not None and all(label in dic for label in topic):
if self._labels_to_send is not None:
topic_to_send = self._labels_to_send[topic]
else:
topic_to_send = topic
self._client.publish(
topic=str(topic_to_send),
payload=dumps([dic[label] for label in topic]),
qos=0)
self.log(logging.DEBUG, f"Sent {[dic[label] for label in topic]}"
f"on the topic {topic_to_send} with QOS 0")
break
[docs]
def finish(self) -> None:
"""Disconnects from the broker and stops it."""
# Disconnecting from the broker
if self._client is not None:
self.log(logging.INFO, "Stopping the client loop")
self._client.loop_stop()
self.log(logging.INFO, "Disconnecting from the broker")
self._client.disconnect()
# Stopping the broker
if self._broker and self._proc is not None:
try:
self.log(logging.INFO, "Stopping the Mosquitto broker")
self._proc.terminate()
self._proc.wait(timeout=15)
self.log(logging.INFO, f"Mosquitto terminated with return code "
f"{self._proc.returncode}")
self._stop_mosquitto = True
if self._reader is not None:
self._reader.join(0.2)
if self._reader.is_alive():
self.log(logging.WARNING, "Reader thread failed to stop !")
except TimeoutExpired:
self.log(logging.WARNING, "Mosquitto did not terminate in time, "
"killing it")
self._proc.kill()
def _launch_mosquitto(self) -> None:
"""Starts Mosquitto in a subprocess."""
try:
self._proc = Popen(['mosquitto', '-p', str(self._port)],
stdout=PIPE,
stderr=STDOUT)
except FileNotFoundError:
raise FileNotFoundError("Mosquitto is not installed !")
def _output_reader(self) -> None:
"""Reads the output strings from Mosquitto's subprocess."""
while not self._stop_mosquitto:
for line in iter(self._proc.stdout.readline, b''):
self.log(logging.INFO, f"[Mosquitto] {line.decode()}")
if 'Error: Address already in use' in line.decode():
self.log(logging.WARNING, "Mosquitto is already running on this "
"port !")
sleep(0.1)
def _on_message(self, _, __, message) -> None:
"""Buffers the received data.
The received message consists in a list of lists of values. Data is placed
in the right buffer according to the topic, in the form of lists of values.
"""
try:
self.log(logging.DEBUG, f"Received message from the broker: {message}")
for data_points in zip(*loads(message.payload)):
self._buffer_output[literal_eval(message.topic)].put_nowait(
list(data_points))
except UnpicklingError:
self.log(logging.WARNING, f"Received message caused UnpicklingError, "
f"ignoring it: {message}")
def _on_connect(self, _, __, ___, rc: Any) -> None:
"""Automatically subscribes to the topics when connecting to the broker."""
self.log(logging.INFO, f"Connected to the broker with return code {rc}")
# Subscribing on connect, so that it automatically resubscribes when
# reconnecting after a connection loss
if self._topics is not None:
for topic in self._topics:
self._client.subscribe(topic=str(topic), qos=0)
self.log(logging.INFO, f"Subscribed to topic {topic}")
self._client.loop_start()