# coding: utf-8
from platform import system
from multiprocessing import Process, Value, Barrier, Event, Queue, \
get_start_method, synchronize, queues
from multiprocessing.sharedctypes import Synchronized
from multiprocessing.connection import wait
from threading import BrokenBarrierError, Thread
from queue import Empty
import logging
import logging.handlers
from time import sleep, time, time_ns
from weakref import WeakSet
from typing import Union, Optional, List, Dict, Any, Iterable
from collections import defaultdict
import subprocess
from sys import stdout, stderr, argv
from pathlib import Path
from .meta_block import MetaBlock
from ...links import Link
from ..._global import LinkDataError, StartTimeout, PrepareError, \
T0NotSetError, GeneratorStop, ReaderStop, CameraPrepareError, \
CameraRuntimeError, CameraConfigError, CrappyFail
from ...tool.ft232h import USBServer
[docs]
class Block(Process, metaclass=MetaBlock):
"""This class constitutes the base object in Crappy.
It is extremely versatile, and can perform a wide variety of actions during a
test. Many Blocks are already defined in Crappy, but it is possible to define
custom ones for specific purposes.
It is a subclass of :obj:`multiprocessing.Process`, and is thus an
independent process in Python. It communicates with other Blocks via
:mod:`multiprocessing` objects.
This class also contains the class methods that allow driving a script with
Crappy. They are always called in the `__main__` Process, and drive the
execution of all the children Blocks.
.. versionadded:: 1.4.0
"""
instances = WeakSet()
names: List[str] = list()
log_level: Optional[int] = logging.DEBUG
# The synchronization objects will be set later
shared_t0: Optional[Synchronized] = None
ready_barrier: Optional[synchronize.Barrier] = None
start_event: Optional[synchronize.Event] = None
stop_event: Optional[synchronize.Event] = None
raise_event: Optional[synchronize.Event] = None
kbi_event: Optional[synchronize.Event] = None
logger: Optional[logging.Logger] = None
log_queue: Optional[queues.Queue] = None
log_thread: Optional[Thread] = None
thread_stop: bool = False
no_raise: bool = False
prepared_all: bool = False
launched_all: bool = False
[docs]
def __init__(self) -> None:
"""Sets the attributes and initializes the parent class."""
super().__init__()
# The lists of input and output links
self.outputs: List[Link] = list()
self.inputs: List[Link] = list()
# Various objects that should be set by child classes
self.niceness: int = 0
self.labels: Optional[Iterable[str]] = None
self.freq = None
self.display_freq = False
self.name = self.get_name(type(self).__name__)
# The synchronization objects will be set later
self._instance_t0: Optional[Synchronized] = None
self._ready_barrier: Optional[synchronize.Barrier] = None
self._start_event: Optional[synchronize.Event] = None
self._stop_event: Optional[synchronize.Event] = None
self._raise_event: Optional[synchronize.Event] = None
self._kbi_event: Optional[synchronize.Event] = None
# The objects for logging will be set later
self._log_queue: Optional[queues.Queue] = None
self._logger: Optional[logging.Logger] = None
self._debug: Optional[bool] = False
self._log_level: int = logging.INFO
# Objects for displaying performance information about the block
self._last_t: Optional[float] = None
self._last_fps: Optional[float] = None
self._n_loops: int = 0
self._last_values = None
def __new__(cls, *args, **kwargs):
"""Called when instantiating a new instance of a Block.
Adds itself to the :obj:`~weakref.WeakSet` listing all the instantiated
Blocks.
"""
instance = super().__new__(cls)
cls.instances.add(instance)
return instance
[docs]
@classmethod
def get_name(cls, name: str) -> str:
"""Method attributing to each new Block a unique name, based on the name of
the class and the number of existing instances for this class.
.. versionadded:: 2.0.0
"""
i = 1
while f"crappy.{name}-{i}" in cls.names:
i += 1
cls.names.append(f"crappy.{name}-{i}")
return f"crappy.{name}-{i}"
[docs]
@classmethod
def start_all(cls,
allow_root: bool = False,
log_level: Optional[int] = logging.DEBUG,
no_raise: bool = False) -> None:
"""Method for starting a script with Crappy.
It sets the synchronization objects for all the Blocks, renices the
corresponding :obj:`~multiprocessing.Process` and starts the Blocks.
The call to this method is blocking until Crappy finishes.
Note:
It is possible to have a finer grained control of the start of a Crappy
script with the methods :meth:`prepare_all`, :meth:`renice_all` and
:meth:`launch_all`.
Args:
allow_root: If set to :obj:`True`, tries to renice the Processes with
sudo privilege in Linux. It requires the Python script to be run
with sudo privilege, otherwise it has no effect.
.. versionchanged:: 2.0.0 renamed from *high_prio* to *allow_root*
log_level: The maximum logging level that will be handled by Crappy. By
default, it is set to the lowest level (:obj:`~logging.DEBUG`) so that
all messages are handled. If set to a higher level, the levels
specified for each Block with the ``debug`` argument may be ignored. If
set to :obj:`None`, logging is totally disabled. Refer to the
documentation of the :mod:`logging` module for information on the
possible levels.
.. versionadded:: 2.0.0
no_raise: When set to :obj:`False`, the Exceptions encountered during
Crappy's execution, as well as the :exc:`KeyboardInterrupt`, will raise
an Exception right before Crappy returns. This is meant to prevent the
execution of code that would come after Crappy, in case Crappy does not
terminate as expected. This behavior can be disabled by setting this
argument to :obj:`True`.
.. versionadded:: 2.0.0
.. versionremoved:: 2.0.0 *t0*, *verbose*, *bg* arguments
"""
cls.prepare_all(log_level)
cls.renice_all(allow_root)
cls.launch_all(no_raise)
[docs]
@classmethod
def prepare_all(cls, log_level: Optional[int] = logging.DEBUG) -> None:
"""Creates the synchronization objects, shares them with the Blocks, and
starts the :obj:`~multiprocessing.Process` associated to the Blocks.
Also initializes the :obj:`~logging.Logger` for the Crappy script.
Once started with this method, the Blocks will call their :meth:`prepare`
method and then be blocked by a :obj:`multiprocessing.Barrier`.
If an error is caught at a moment when the Blocks might already be running,
performs an extensive cleanup to ensure everything stops as expected.
Args:
log_level: The maximum logging level that will be handled by Crappy. By
default, it is set to the lowest level (:obj:`~logging.DEBUG`) so that
all messages are handled. If set to a higher level, the levels
specified for each Block with the ``debug`` argument may be ignored. If
set to :obj:`None`, logging is totally disabled. Refer to the
documentation of the :mod:`logging` module for information on the
possible levels.
.. versionadded:: 2.0.0
.. versionremoved:: 2.0.0 *verbose* argument
"""
# Flag indicating whether to perform the cleanup action or not
cleanup = False
try:
# Making sure that the Block classmethods are called in the right order
if cls.prepared_all:
cls.cls_log(logging.ERROR,
"The method prepare_all was already called ! This is "
"unexpected, aborting !")
# As Crappy was already initialized, it must now be cleaned up
cleanup = True
# Raising will skip all the setup part and keep the existing context
raise RuntimeError
if cls.launched_all:
cls.cls_log(logging.ERROR, "The launched_all flag is unexpectedly "
"raised, aborting !")
# As Crappy was already initialized, it must now be cleaned up
cleanup = True
# Raising will skip all the setup part and keep the existing context
raise RuntimeError
cls.log_level = log_level
# Initializing the logger and displaying the first messages
cls._set_logger()
cls.cls_log(logging.INFO,
"===================== CRAPPY =====================")
cls.cls_log(logging.INFO, f'Starting the script {argv[0]}\n')
cls.cls_log(logging.INFO, 'Logger configured')
# Setting all the synchronization objects at the class level
cls.ready_barrier = Barrier(len(cls.instances) + 1)
cls.shared_t0 = Value('d', -1.0)
cls.start_event = Event()
cls.stop_event = Event()
cls.raise_event = Event()
cls.kbi_event = Event()
cls.cls_log(logging.INFO, 'Multiprocessing synchronization objects set '
'for main process')
# Starting from that point, Crappy has to be cleaned up if anything wrong
# happens
cleanup = True
# Initializing the objects required for logging
cls.log_queue = Queue()
cls.log_thread = Thread(target=cls._log_target)
if get_start_method() == 'spawn':
cls.log_thread.start()
cls.cls_log(logging.INFO, 'Logger thread started')
# Starting the USB server if required
if USBServer.initialized:
cls.cls_log(logging.INFO, "Starting the USB server")
USBServer.start_server(cls.log_queue, logging.INFO)
# Passing the synchronization and logging objects to each Block
for instance in cls.instances:
instance._ready_barrier = cls.ready_barrier
instance._instance_t0 = cls.shared_t0
instance._stop_event = cls.stop_event
instance._start_event = cls.start_event
instance._raise_event = cls.raise_event
instance._kbi_event = cls.kbi_event
instance._log_queue = cls.log_queue
cls.cls_log(logging.INFO, f'Multiprocessing synchronization objects '
f'set for {instance.name} Block')
# Setting the common log level to all the instances
if instance._log_level is not None:
if cls.log_level is not None:
instance._log_level = max(instance._log_level, cls.log_level)
else:
instance._log_level = None
cls.cls_log(logging.INFO, f"Log level set for the {instance.name} "
f"Block")
# Starting all the Blocks
for instance in cls.instances:
instance.start()
cls.cls_log(logging.INFO, f'Started the {instance.name} Block')
# Setting the prepared flag
cls.prepared_all = True
# At that point the Blocks might be started or not. If started, they are
# preparing or waiting at the Barrier
except (Exception, KeyboardInterrupt) as exc:
# If there is no specific cleanup to perform, only raising
if not cleanup:
raise
# KeyboardInterrupt is a separate case
if isinstance(exc, KeyboardInterrupt):
cls.cls_log(logging.WARNING, "Caught KeyboardInterrupt in the main "
"Process while running prepare_all !")
# Special Event for the KeyboardInterrupt
cls.kbi_event.set()
cls.cls_log(logging.WARNING, 'Set the KbI Event after catching '
'KeyboardInterrupt in the main Process '
'in prepare_all')
# General case
else:
cls.logger.exception("Caught exception while running prepare_all, "
"aborting", exc_info=exc)
# Any Exception caught in the main Process must stop the script
cls.raise_event.set()
cls.cls_log(logging.WARNING, 'Set the raise Event after exception was '
'caught in the main Process in '
'prepare_all')
# Breaking the Barrier to warn other Processes that something went wrong
cls.ready_barrier.abort()
cls.cls_log(logging.WARNING, "Broke the Barrier due to an exception "
"caught in prepare_all")
# Need to clean up as some Blocks might already be running
cls._cleanup()
[docs]
@classmethod
def renice_all(cls, allow_root: bool) -> None:
"""On Linux and macOS, renices the :obj:`~multiprocessing.Process`
associated with the Blocks.
On Windows, does nothing.
If an error is caught, performs an extensive cleanup to ensure everything
stops as expected.
Args:
allow_root: If set to :obj:`True`, tries to renice the Processes with
sudo privilege in Linux. It requires the Python script to be run with
sudo privilege, otherwise it has no effect.
.. versionchanged:: 2.0.0 renamed from *high_prio* to *allow_root*
"""
# Flag indicating whether to perform the cleanup action or not
cleanup = True
try:
# Making sure that the Block classmethods are called in the right order
if not cls.prepared_all:
cls.cls_log(logging.ERROR, "Cannot call renice before calling "
"prepare ! Aborting")
# If prepare wasn't called, there is no need to clean up Crappy
cleanup = False
raise RuntimeError("Cannot call renice before calling prepare ! "
"Aborting")
if cls.launched_all:
cls.cls_log(logging.ERROR, "The launched_all flag is unexpectedly "
"raised, aborting !")
raise RuntimeError
# There's no niceness on Windows
if system() == "Windows":
cls.cls_log(logging.INFO, 'Not renicing processes on Windows')
return
# Renicing all the Blocks
cls.cls_log(logging.INFO, 'Renicing processes')
for inst in cls.instances:
# If root is not allowed then the minimum niceness is 0
niceness = max(inst.niceness, 0 if not allow_root else -20)
# System call for setting the niceness
if niceness < 0:
subprocess.call(['sudo', 'renice', str(niceness), '-p',
str(inst.pid)], stdout=subprocess.DEVNULL)
cls.cls_log(logging.INFO, f"Reniced process {inst.name} with PID "
f"{inst.pid} to niceness {niceness} "
f"with sudo privilege")
else:
subprocess.call(['renice', str(niceness), '-p', str(inst.pid)],
stdout=subprocess.DEVNULL)
cls.cls_log(logging.INFO, f"Reniced process {inst.name} with PID "
f"{inst.pid} to niceness {niceness}")
# At that point the Blocks should be preparing or waiting at the Barrier
except (Exception, KeyboardInterrupt) as exc:
# If there is no specific cleanup to perform, only raising
if not cleanup:
raise
# KeyboardInterrupt is a separate case
if isinstance(exc, KeyboardInterrupt):
cls.cls_log(logging.WARNING, "Caught KeyboardInterrupt in the main "
"Process while running renice_all !")
# Special Event for the KeyboardInterrupt
cls.kbi_event.set()
cls.cls_log(logging.WARNING, 'Set the KbI Event after catching '
'KeyboardInterrupt in the main Process '
'in renice_all')
# General case
else:
cls.logger.exception("Caught exception while running renice_all, "
"aborting", exc_info=exc)
# Any Exception caught in the main Process must stop the script
cls.raise_event.set()
cls.cls_log(logging.WARNING, 'Set the raise Event after exception was '
'caught in the main Process in '
'renice_all')
# Breaking the Barrier to warn other Processes that something went wrong
cls.ready_barrier.abort()
cls.cls_log(logging.WARNING, "Broke the Barrier due to an exception "
"caught in renice_all")
# Need to clean up the running Blocks and other Processes / Threads
cls._cleanup()
[docs]
@classmethod
def launch_all(cls, no_raise: bool = False) -> None:
"""The final method being called by the main
:obj:`~multiprocessing.Process` running a script with Crappy.
It unlocks all the Blocks by releasing the synchronization
:obj:`~multiprocessing.Barrier`, sets the shared t0
:obj:`~multiprocessing.Value`, and then waits for all the Blocks to finish.
In case an exception is raised, sets the stop :obj:`~multiprocessing.Event`
for warning the Blocks, waits for the Blocks to finish, and if they don't,
terminates them.
Args:
no_raise: When set to :obj:`False`, the Exceptions encountered during
Crappy's execution, as well as the :exc:`KeyboardInterrupt`, will raise
an Exception right before Crappy returns. This is meant to prevent the
execution of code that would come after Crappy, in case Crappy does not
terminate as expected. This behavior can be disabled by setting this
argument to :obj:`True`.
.. versionremoved:: 2.0.0 *t0*, *verbose* and *bg* arguments
"""
# Setting the no_raise flag
cls.no_raise = no_raise
# Flag indicating whether to perform the cleanup action or not
cleanup = True
try:
# Making sure that the Block classmethods are called in the right order
if not cls.prepared_all:
cls.cls_log(logging.ERROR, "Cannot call launch_all before calling "
"prepare_all ! Aborting")
# If prepare wasn't called, there is no need to clean up Crappy
cleanup = False
raise RuntimeError("Cannot call launch before calling prepare ! "
"Aborting")
if cls.launched_all:
cls.cls_log(logging.ERROR, "The launched_all flag is unexpectedly "
"raised, aborting !")
raise RuntimeError
cls.launched_all = True
# The Barrier waits for the main Process to be ready so that the
# prepare_all and launch_all methods can be used separately for a finer
# grained control
cls.cls_log(logging.INFO, 'Waiting for all Blocks to be ready')
cls.ready_barrier.wait()
cls.cls_log(logging.INFO, 'All Blocks ready now')
# Setting t0 and telling all the Blocks to start
cls.shared_t0.value = time_ns() / 1e9
cls.cls_log(logging.INFO, f'Start time set to {cls.shared_t0.value}s')
cls.start_event.set()
cls.cls_log(logging.INFO, 'Start event set, all Blocks can now start')
# The main Process mustn't finish before all the Blocks are stopped
cls.cls_log(logging.INFO, 'Main Process done, waiting for all Blocks to '
'finish')
for _ in wait([inst.sentinel for inst in cls.instances]):
cls.cls_log(logging.INFO, "A Block has finished, waiting for the "
"other ones to follow")
except (BrokenBarrierError, KeyboardInterrupt, Exception) as exc:
# If there is no specific cleanup to perform, only raising
if not cleanup:
raise
# KeyboardInterrupt is a separate case
if isinstance(exc, KeyboardInterrupt):
cls.cls_log(logging.WARNING, "Caught KeyboardInterrupt in the main "
"Process while running launch_all !")
# Special Event for the KeyboardInterrupt
cls.kbi_event.set()
cls.cls_log(logging.WARNING, 'Set the KbI Event after catching '
'KeyboardInterrupt in the main Process '
'in launch_all')
# Case when a Block crashed while preparing
elif isinstance(exc, BrokenBarrierError):
cls.cls_log(logging.ERROR, "Exception raised in a Block while waiting "
"for all Blocks to be ready, stopping")
# General case
else:
cls.logger.exception("Caught exception while running launch_all, "
"aborting", exc_info=exc)
# Any Exception caught in the main Process must stop the script
cls.raise_event.set()
cls.cls_log(logging.WARNING, 'Set the raise Event after exception was '
'caught in the main Process in '
'launch_all')
# Breaking the Barrier to warn other Processes that something went wrong
cls.ready_barrier.abort()
cls.cls_log(logging.WARNING, "Broke the Barrier due to an exception "
"caught in launch_all")
finally:
# Need to clean up the running Blocks and other Processes / Threads
if cleanup:
cls._cleanup()
@classmethod
def _cleanup(cls) -> None:
"""Method called at the very end of every script execution.
It first waits for all the Blocks to end, and kills them if they don't stop
by themselves. Then, it also stops, if relevant, the USBServer and the
log_thread, and warns the user in case Processes would still be running.
Finally, it raises an exception if needed, in order to stop the script of
the main Process. This way, any action that could follow the normal
execution of Crappy won't happen, unless the user explicitly catches
Crappy's exception and decides to go on with the script.
"""
try:
# Setting the stop Event, to indicate all the Blocks to finish
cls.stop_event.set()
cls.cls_log(logging.INFO, 'Stop event set, waiting for all Blocks to '
'finish')
t = time()
# Waiting at most 3 seconds for all the Blocks to finish
while cls.instances and not all(not inst.is_alive() for inst
in cls.instances):
cls.cls_log(logging.INFO, "All Blocks not stopped yet")
sleep(0.5)
# After 3 seconds, killing the Blocks that didn't stop
if time() - t > 3:
cls.cls_log(logging.WARNING, 'All Blocks not stopped, terminating '
'the living ones')
for inst in cls.instances:
if inst.is_alive():
inst.terminate()
cls.cls_log(logging.WARNING, f'Block {inst.name} terminated')
else:
cls.cls_log(logging.INFO, f'Block {inst.name} done')
break
# Stopping the USB server if required
if USBServer.initialized:
cls.cls_log(logging.INFO, "Stopping the USB server")
USBServer.stop_server()
# Stopping the log thread if required
if get_start_method() == 'spawn' and cls.log_thread is not None:
cls.thread_stop = True
cls.log_thread.join(timeout=0.1)
# Warning in case the log thread did not stop correctly
if cls.log_thread is not None and cls.log_thread.is_alive():
cls.cls_log(logging.WARNING, "The Thread reading the log messages did "
"not terminate in time !")
# Checking whether all Blocks terminated gracefully
if cls.instances and any(inst.is_alive() for inst in cls.instances):
running = ', '.join(inst.name for inst in cls.instances
if inst.is_alive())
cls.cls_log(logging.ERROR, f"Crappy failed to finish gracefully, "
f"Block(s) {running} still running !")
# An Exception is raised in case all the Blocks don't finish gracefully
cls.raise_event.set()
cls.cls_log(logging.WARNING, 'Set the raise Event because all the '
'Blocks did not terminate as requested')
else:
cls.cls_log(logging.INFO, 'All Blocks done, Crappy terminated '
'gracefully !\n')
# Exceptions at that point cannot really be handled, but should still raise
# in the main Process
except (Exception, KeyboardInterrupt) as exc:
# KeyboardInterrupt is a separate case
if isinstance(exc, KeyboardInterrupt):
cls.cls_log(logging.WARNING, "Caught KeyboardInterrupt while "
"cleaning up, ignoring it !")
# Special Event for the KeyboardInterrupt
cls.kbi_event.set()
cls.cls_log(logging.WARNING, 'Set the KbI Event after catching '
'KeyboardInterrupt while cleaning up')
else:
cls.logger.exception("Caught exception while cleaning up !",
exc_info=exc)
# Any Exception caught in the main Process must stop the script
cls.raise_event.set()
cls.cls_log(logging.WARNING, 'Set the raise Event after exception was '
'caught in the main Process while '
'cleaning up')
# Deciding whether to raise and stop the main Process, and also resetting
finally:
# The try/finally is needed to reset Crappy before the exception is
# raised but after the class Events are accessed
try:
# Deciding whether to raise or not
if cls.raise_event.is_set() and not cls.no_raise:
cls.cls_log(logging.ERROR, "An error occurred during Crappy's "
"execution, raising CrappyFail !")
raise CrappyFail
elif cls.kbi_event.is_set() and not cls.no_raise:
cls.cls_log(logging.ERROR, "KeyboardInterrupt called while running "
"Crappy, raising it !")
raise KeyboardInterrupt("Crappy was stopped using CTRL+C ! To "
"disable this Exception, set the no_raise "
"argument of crappy.start() or "
"crappy.launch() to True.")
finally:
# Always resetting Crappy as the Blocks and synchronization objects
# won't come in use anymore
cls.reset()
@classmethod
def _set_logger(cls) -> None:
"""Initializes the logging for the main Process.
It creates two Stream Loggers, one for the info and debug levels displaying
on stdout and one for the other levels displaying on stderr. It also
creates a File Logger for saving the log to a log file.
The levels WARNING and above are always being displayed in the terminal, no
matter what the user chooses. Similarly, the INFO log level and above are
always being saved to the log file.
"""
# The Logger handling all messages
crappy_log = logging.getLogger('crappy')
if cls.log_level is not None:
crappy_log.setLevel(cls.log_level)
else:
logging.disable()
# In case there's no logging, no need to configure the handlers
if cls.log_level is not None:
# The two handlers for displaying messages in the console
stream_handler = logging.StreamHandler(stream=stdout)
stream_handler_err = logging.StreamHandler(stream=stderr)
# Getting the path to Crappy's temporary folder
if system() in ('Linux', 'Darwin'):
log_path = Path('/tmp/crappy')
elif system() == 'Windows':
log_path = Path.home() / 'AppData' / 'Local' / 'Temp' / 'crappy'
else:
log_path = None
# Creating Crappy's temporary folder if needed
if log_path is not None:
try:
log_path.mkdir(parents=False, exist_ok=True)
except FileNotFoundError:
log_path = None
# This handler writes the log messages to a log file
if log_path is not None:
file_handler = logging.FileHandler(log_path / 'logs.txt', mode='w')
else:
file_handler = None
# Setting the log levels for the handlers
stream_handler.setLevel(max(logging.DEBUG, cls.log_level))
stream_handler.addFilter(cls._stdout_filter)
stream_handler_err.setLevel(max(logging.WARNING, cls.log_level))
if file_handler is not None:
file_handler.setLevel(max(logging.DEBUG, cls.log_level))
# Setting the log format for the handlers
log_format = logging.Formatter('%(asctime)s %(name)s %(levelname)-8s '
'%(message)s')
stream_handler.setFormatter(log_format)
stream_handler_err.setFormatter(log_format)
if file_handler is not None:
file_handler.setFormatter(log_format)
# Adding the handlers to the Logger
crappy_log.addHandler(stream_handler)
crappy_log.addHandler(stream_handler_err)
if file_handler is not None:
crappy_log.addHandler(file_handler)
cls.logger = crappy_log
[docs]
@classmethod
def stop_all(cls) -> None:
"""Method for stopping all the Blocks by setting the stop
:obj:`~multiprocessing.Event`.
.. versionremoved:: 2.0.0 *verbose* argument
"""
if cls.stop_event is not None:
cls.stop_event.set()
cls.cls_log(logging.INFO, 'Stop event set after a call to stop(), all '
'Blocks should now finish')
[docs]
@classmethod
def reset(cls) -> None:
"""Resets Crappy by emptying the :obj:`~weakref.WeakSet` containing
references to all the Blocks and resetting the synchronization objects.
This method is called at the very end of the :meth:`_cleanup` method, but
can also be called to "revert" the instantiation of Blocks while Crappy
isn't started yet.
"""
cls.instances = WeakSet()
cls.names = list()
cls.thread_stop = False
cls.prepared_all = False
cls.launched_all = False
cls.no_raise = False
cls.shared_t0 = None
cls.ready_barrier = None
cls.start_event = None
cls.stop_event = None
cls.raise_event = None
cls.kbi_event = None
if cls.logger is not None:
cls.cls_log(logging.INFO, 'Crappy was successfully reset')
@classmethod
def cls_log(cls, level: int, msg: str) -> None:
"""Wrapper for logging messages in the main Process.
Ensures the Logger exists before trying to log, thus avoiding potential
errors.
.. versionadded:: 2.0.0
"""
if cls.logger is None:
return
cls.logger.log(level=level, msg=msg)
@classmethod
def _log_target(cls) -> None:
"""This method is the target to the Logger Thread.
It reads log messages from a Queue and passes them to the Logger for
handling.
"""
while not cls.thread_stop:
try:
record = cls.log_queue.get(block=True, timeout=0.05)
except Empty:
continue
logger = logging.getLogger(record.name)
logger.handle(record)
@staticmethod
def _stdout_filter(rec: logging.LogRecord) -> bool:
"""Returns :obj:`True` if the input log message has level INFO or DEBUG,
:obj:`False` otherwise."""
return rec.levelno in (logging.DEBUG, logging.INFO)
[docs]
def run(self) -> None:
"""The method run by the Blocks when their :obj:`~multiprocessing.Process`
is started.
It first calls :meth:`prepare`, then waits at the
:obj:`~multiprocessing.Barrier` for all Blocks to be ready, then calls
:meth:`begin`, then :meth:`main`, and finally :meth:`finish`.
If an exception is raised, sets the shared stop
:obj:`~multiprocessing.Event` to warn all the other Blocks.
"""
try:
# Any Exception caught at the beginning should break the Barrier
try:
# Initializes the Logger for the Block
self._set_block_logger()
self.log(logging.INFO, "Block launched")
# Running the preliminary actions before the test starts
self.log(logging.INFO, "Block preparing")
self.prepare()
# If an Exception is raised, warning the other Blocks by breaking the
# Barrier
except (Exception, KeyboardInterrupt):
self._ready_barrier.abort()
self.log(logging.WARNING, "Broke the Barrier after an Exception was "
"caught while preparing")
raise
# Waiting for all Blocks to be ready, except if the Barrier was broken
try:
self.log(logging.INFO, "Waiting for the other Blocks to be ready")
self._ready_barrier.wait()
self.log(logging.INFO, "All Blocks ready now")
except BrokenBarrierError:
raise PrepareError
# Waiting for t0 to be set, should take a few milliseconds at most
self.log(logging.INFO, "Waiting for the start time to be set")
self._start_event.wait(timeout=1)
if not self._start_event.is_set():
raise StartTimeout
else:
self.log(logging.INFO, "Start time set, Block starting")
# Running the first loop
self.log(logging.INFO, "Calling begin method")
self.begin()
# Setting the attributes for counting the performance
self._last_t = time_ns() / 1e9
self._last_fps = self._last_t
self._n_loops = 0
# Running the main loop until told to stop
self.log(logging.INFO, "Entering main loop")
self.main()
self.log(logging.INFO, "Exiting main loop after stop Event was set")
# A wrong data type was sent through a Link
except LinkDataError:
self.log(logging.ERROR, "Tried to send a wrong data type through a Link,"
" stopping !")
# Any unexpected Exception should stop the script
self._raise_event.set()
self.log(logging.WARNING, 'Set the raise Event after catching an '
'unexpected Exception while running')
# An error occurred in another Block while preparing
except PrepareError:
self.log(logging.ERROR, "Exception raised in another Block while waiting"
" for all Blocks to be ready, stopping")
# An error occurred in the CameraConfig window while preparing
except CameraConfigError:
self.log(logging.ERROR, "Exception raised in a configuration window, "
"stopping")
# Any unexpected Exception should stop the script
self._raise_event.set()
self.log(logging.WARNING, 'Set the raise Event after catching an '
'unexpected Exception while running')
# An error occurred in a Camera process while preparing
except CameraPrepareError:
self.log(logging.ERROR, "Exception raised in a Camera Process while "
"preparing, stopping")
# Any unexpected Exception should stop the script
self._raise_event.set()
self.log(logging.WARNING, 'Set the raise Event after catching an '
'unexpected Exception while running')
# An error occurred in a Camera Process while running
except CameraRuntimeError:
self.log(logging.ERROR, "Exception raised in a Camera process while "
"running, stopping")
# Any unexpected Exception should stop the script
self._raise_event.set()
self.log(logging.WARNING, 'Set the raise Event after catching an '
'unexpected Exception while running')
# The start Event took too long to be set
except StartTimeout:
self.log(logging.ERROR, "Waited too long for start time to be set, "
"aborting !")
# Any unexpected Exception should stop the script
self._raise_event.set()
self.log(logging.WARNING, 'Set the raise Event after catching an '
'unexpected Exception while running')
# Tried to access t0 but it's not set yet
except T0NotSetError:
self.log(logging.ERROR, "Trying to get the value of t0 when it's not "
"set yet, aborting")
# Any unexpected Exception should stop the script
self._raise_event.set()
self.log(logging.WARNING, 'Set the raise Event after catching an '
'unexpected Exception while running')
# A Generator Block finished its Path
except GeneratorStop:
self.log(logging.WARNING, f"Generator Path exhausted, stopping the "
f"Block")
# A FileReader Camera object has no more file to read from
except ReaderStop:
self.log(logging.WARNING, "Exhausted all the images to read from a "
"FileReader Camera, stopping the Block")
# The user requested the script to stop
except KeyboardInterrupt:
self.log(logging.WARNING, f"KeyboardInterrupt caught, stopping")
# A KeyboardInterrupt should stop the script and be raised as is
self._kbi_event.set()
self.log(logging.WARNING, 'Set the KbI Event after catching a '
'KeyboardInterrupt while running')
# Another Exception occurred
except (Exception,) as exc:
self._logger.exception("Caught Exception while running !", exc_info=exc)
# Any unexpected Exception should stop the script
self._raise_event.set()
self.log(logging.WARNING, 'Set the raise Event after catching an '
'unexpected Exception while running')
# In all cases, trying to properly close the Block
finally:
try:
self.log(logging.INFO, "Setting the stop Event")
self._stop_event.set()
self.log(logging.INFO, "Calling the finish method")
self.finish()
except KeyboardInterrupt:
self.log(logging.WARNING, "Caught KeyboardInterrupt while finishing, "
"ignoring it")
# A KeyboardInterrupt should stop the script and be raised as is
self._kbi_event.set()
self.log(logging.WARNING, 'Set the KbI Event after catching a '
'KeyboardInterrupt while finishing')
except (Exception,) as exc:
self._logger.exception("Caught Exception while finishing !",
exc_info=exc)
# Any unexpected Exception should stop the script
self._raise_event.set()
self.log(logging.WARNING, 'Set the raise Event after catching an '
'unexpected Exception while finishing')
[docs]
def main(self) -> None:
"""The main loop of the :meth:`run` method. Repeatedly calls the
:meth:`loop` method and manages the looping frequency."""
# Looping until told to stop or an error occurs
while not self._stop_event.is_set():
self.log(logging.DEBUG, "Looping")
self.loop()
self.log(logging.DEBUG, "Handling freq")
self._handle_freq()
[docs]
def prepare(self) -> None:
"""This method should perform any action required for initializing the
Block before the test starts.
For example, it can open a network connection, create a file, etc. It is
also fine for this method not to be overriden if there's no particular
action to perform.
Note that this method is called once the :obj:`~multiprocessing.Process`
associated to the Block has been started.
"""
...
[docs]
def begin(self) -> None:
"""This method can be considered as the first loop of the test, and is
called before the :meth:`loop` method.
It allows to perform initialization actions that cannot be achieved in the
:meth:`prepare` method.
"""
...
[docs]
def loop(self) -> None:
"""This method is the core of the Block. It is called repeatedly during the
test, until the test stops or an error occurs.
Only in this method should data be sent to downstream Blocks, or received
from upstream Blocks.
Although it is possible not to override this method, that has no practical
interest and this method should always be rewritten.
"""
self.log(logging.WARNING, f"Loop method not defined, this block does "
f"nothing !")
sleep(1)
[docs]
def finish(self) -> None:
"""This method should perform any action required for properly ending the
test.
For example, it can close a file or disconnect from a network. It is also
fine for this method not to be overriden if no particular action needs to
be performed.
Note that this method will normally be called even in case an error occurs,
although that cannot be guaranteed.
"""
...
def stop(self) -> None:
"""This method stops all the running Blocks.
It should be called from the :meth:`loop` method of a Block. It allows to
stop the execution of the script in a clean way, without raising an
exception. It is mostly intended for users writing their own Blocks.
Note:
Calling this method in :meth:`__init__`, :meth:`prepare` or :meth:`begin`
is not recommended, as the Block will only stop when reaching the
:meth:`loop` method. Calling this method during :meth:`finish` will have
no effect.
"""
if self._stop_event is not None:
self.log(logging.WARNING, "stop method called, setting the stop event !")
self._stop_event.set()
def _handle_freq(self) -> None:
"""This method ensures that the Block loops at the desired frequency, or as
fast as possible if the requested frequency cannot be achieved.
It also displays the looping frequency of the Block if requested by the
user. If no looping frequency is specified, the Block will loop as fast as
possible.
"""
self._n_loops += 1
t = time_ns() / 1e9
# Only handling frequency if requested
if self.freq is not None:
# Correcting the error of the sleep function through a recursive approach
# The last 2 milliseconds are in free loop
remaining = self._last_t + 1 / self.freq - t
while remaining > 0:
t = time_ns() / 1e9
remaining = self._last_t + 1 / self.freq - t
sleep(max(0., remaining / 2 - 2e-3))
self._last_t = t
# Displaying frequency every 2 seconds
if self.display_freq and self._last_t - self._last_fps > 2:
self.log(
logging.INFO,
f"loops/s: {self._n_loops / (self._last_t - self._last_fps)}")
self._n_loops = 0
self._last_fps = self._last_t
def _set_block_logger(self) -> None:
"""Initializes the Logger for the Block.
If the :mod:`multiprocessing` start method is `spawn` (mostly on Windows),
redirects the log messages to a Queue for passing them to the main Process.
"""
logger = logging.getLogger(self.name)
# Adjusting logging to the desired level
if self._log_level is not None:
logger.setLevel(self._log_level)
else:
logging.disable()
# On Windows, the messages need to be sent through a Queue for logging
if get_start_method() == "spawn" and self._log_level is not None:
queue_handler = logging.handlers.QueueHandler(self._log_queue)
queue_handler.setLevel(self._log_level)
logger.addHandler(queue_handler)
self._logger = logger
@property
def debug(self) -> Optional[bool]:
"""Indicates whether the debug information should be displayed or not.
If :obj:`False` (the default), only displays the :obj:`~logging.INFO`
logging level. If :obj:`True`, displays the :obj:`~logging.DEBUG` logging
level for the Block. And if :obj:`None`, displays only the
:obj:`~logging.CRITICAL` logging level, which is equivalent to no
information at all.
.. versionadded:: 2.0.0
"""
return self._debug
@debug.setter
def debug(self, val: Optional[bool]) -> None:
if val is not None:
if val:
self._debug = True
self._log_level = logging.DEBUG
else:
self._debug = False
self._log_level = logging.INFO
else:
self._debug = None
self._log_level = logging.CRITICAL
@property
def t0(self) -> float:
"""Returns the value of t0, the exact moment when the test started that is
shared between all the Blocks.
.. versionadded:: 2.0.0
"""
if self._instance_t0 is not None and self._instance_t0.value > 0:
self.log(logging.DEBUG, "Start time value requested")
return self._instance_t0.value
else:
raise T0NotSetError
[docs]
def add_output(self, link: Link) -> None:
"""Adds an output :class:`~crappy.links.Link` to the list of output Links
of the Block."""
self.outputs.append(link)
[docs]
def log(self, log_level: int, msg: str) -> None:
"""Method for recording log messages from the Block. This option should be
preferred to calling :func:`print`.
Args:
log_level: An :obj:`int` indicating the logging level of the message.
msg: The message to log, as a :obj:`str`.
.. versionadded:: 2.0.0
"""
if self._logger is None:
return
self._logger.log(log_level, msg)
[docs]
def send(self, data: Optional[Union[Dict[str, Any], Iterable[Any]]]) -> None:
"""Method for sending data to downstream Blocks.
The exact same :obj:`dict` is sent to every downstream Block.
This method accepts the data to send either as a :obj:`dict` or as another
type of iterable (like a :obj:`list` or a :obj:`tuple`). If data is
provided as a dict, it is sent as is. The keys of the dict then correspond
to the labels. Otherwise, the values given as an iterable are first
converted to a dict using the ``self.labels`` attribute containing the
labels to use.
It is up to the user to match the order of the values in the iterable with
the order of the labels in ``self.labels``. If the number of labels and the
number of values to send do not match, no error is raised but some data
might not get sent.
"""
# Just in case, not handling non-existing data
if data is None:
return
# Case when the data to send is not given as a dict
if not isinstance(data, dict):
# First, checking that labels are provided
if self.labels is None or not self.labels:
self.log(logging.ERROR, "Trying to send data as an iterable, but no "
"labels are specified ! Please add a "
"self.labels attribute.")
raise LinkDataError
# Trying to convert iterable data to dict using the given labels
try:
self.log(logging.DEBUG, f"Converting {data} to dict before sending")
data = dict(zip(self.labels, data))
except TypeError:
self.log(logging.ERROR, f"Cannot convert data to send (of type "
f"{type(data)}) to dict ! Please ensure that "
f"the data is given as an iterable, as well as"
f" self.labels.")
raise
# Sending the data to the downstream Blocks
for link in self.outputs:
self.log(logging.DEBUG, f"Sending {data} to Link {link.name}")
link.send(data)
[docs]
def data_available(self) -> bool:
"""Returns :obj:`True` if there's data available for reading in at least
one of the input :class:`~crappy.links.Link`.
.. versionchanged:: 2.0.0 renamed from *poll* to *data_available*
"""
self.log(logging.DEBUG, "Data availability requested")
return self.inputs and any(link.poll() for link in self.inputs)
[docs]
def recv_data(self) -> Dict[str, Any]:
"""Reads the first available values from each incoming
:class:`~crappy.links.Link` and returns them all in a single dict.
The returned :obj:`dict` might not always have a fixed number of keys,
depending on the availability of incoming data.
Also, the returned values are the oldest available in the Links. See
:meth:`recv_last_data` for getting the newest available values.
Important:
If data is received over a same label from different Links, part of it
will be lost ! Always avoid using a same label twice in a Crappy script.
Returns:
A :obj:`dict` whose keys are the received labels and with a single value
for each key (usually a :obj:`float` or a :obj:`str`).
.. versionchanged:: 2.0.0 renamed from *recv_all* to *recv_data*
"""
ret = dict()
for link in self.inputs:
ret.update(link.recv())
self.log(logging.DEBUG, f"Called recv_data, got {ret}")
return ret
[docs]
def recv_last_data(self, fill_missing: bool = True) -> Dict[str, Any]:
"""Reads all the available values from each incoming
:class:`~crappy.links.Link`, and returns the newest ones in a single dict.
The returned :obj:`dict` might not always have a fixed number of keys,
depending on the availability of incoming data.
Important:
If data is received over a same label from different Links, part of it
will be lost ! Always avoid using a same label twice in a Crappy script.
Args:
fill_missing: If :obj:`True`, fills up the missing data for the known
labels. This way, the last value received from all known labels is
always returned. It can of course not fill up missing data for labels
that haven't been received yet.
Returns:
A :obj:`dict` whose keys are the received labels and with a single value
for each key (usually a :obj:`float` or a :obj:`str`).
.. versionremoved:: 1.5.10 *num* argument
.. versionadded:: 1.5.10 *blocking* argument
.. versionremoved:: 2.0.0 *blocking* argument
.. versionchanged:: 2.0.0 renamed from *get_last* to *recv_last_data*
"""
# Initializing the buffer storing the last received values
if self._last_values is None:
self._last_values = [dict() for _ in self.inputs]
ret = dict()
# Storing the received values in the return dict and in the buffer
for link, buffer in zip(self.inputs, self._last_values):
data = link.recv_last()
ret.update(data)
buffer.update(data)
# If requested, filling up the missing values in the return dict
if fill_missing:
for buffer in self._last_values:
ret.update(buffer)
self.log(logging.DEBUG, f"Called recv_last_data, got {ret}")
return ret
[docs]
def recv_all_data(self,
delay: Optional[float] = None,
poll_delay: float = 0.1) -> Dict[str, List[Any]]:
"""Reads all the available values from each incoming
:class:`~crappy.links.Link`, and returns them all in a single dict.
The returned :obj:`dict` might not always have a fixed number of keys,
depending on the availability of incoming data.
Important:
If data is received over a same label from different Links, part of it
will be lost ! Always avoid using a same label twice in a Crappy script.
See the :meth:`recv_all_data_raw` method for receiving data with no loss.
Warning:
As the time label is (normally) shared between all Blocks, the values
returned for this label will be inconsistent and shouldn't be used !
Args:
delay: If given specifies a delay, as a :obj:`float`, during which the
method acquired data before returning. All the data received during
this delay is saved and returned. Otherwise, just reads all the
available data and returns as soon as it is exhausted.
poll_delay: If the ``delay`` argument is given, the Links will be polled
once every this value seconds. It ensures that the method doesn't spam
the CPU in vain.
Returns:
A :obj:`dict` whose keys are the received labels and with a :obj:`list`
of received values for each key. The first item in the list is the oldest
one available in the Link, the last item is the newest available.
.. versionremoved:: 1.5.10 *num* argument
.. versionadded:: 1.5.10 *blocking* argument
.. versionremoved:: 2.0.0 *blocking* argument
.. versionchanged:: 2.0.0 renamed from *get_all_last* to *recv_all_data*
"""
ret = defaultdict(list)
t0 = time()
# If simple recv_all, just receiving from all input links
if delay is None:
for link in self.inputs:
ret.update(link.recv_chunk())
# Otherwise, receiving during the given period
else:
while time() - t0 < delay:
last_t = time()
# Updating the list of received values
for link in self.inputs:
data = link.recv_chunk()
for label, values in data.items():
ret[label].extend(values)
# Sleeping to avoid useless CPU usage
sleep(max(0., last_t + poll_delay - time()))
# Returning a dict, not a defaultdict
self.log(logging.DEBUG, f"Called recv_all_data, got {dict(ret)}")
return dict(ret)
[docs]
def recv_all_data_raw(self,
delay: Optional[float] = None,
poll_delay: float = 0.1) -> List[Dict[str, List[Any]]]:
"""Reads all the available values from each incoming
:class:`~crappy.links.Link`, and returns them separately in a list of
dicts.
Unlike :meth:`recv_all_data` this method does not fuse the received data
into a single :obj:`dict`, so it is guaranteed to return all the available
data with no loss.
Args:
delay: If given specifies a delay, as a :obj:`float`, during which the
method acquired data before returning. All the data received during
this delay is saved and returned. Otherwise, just reads all the
available data and returns as soon as it is exhausted.
poll_delay: If the ``delay`` argument is given, the Links will be polled
once every this value seconds. It ensures that the method doesn't spam
the CPU in vain.
Returns:
A :obj:`list` containing :obj:`dict`, whose keys are the received labels
and with a :obj:`list` of received value for each key.
.. versionadded:: 2.0.0
"""
ret = [defaultdict(list) for _ in self.inputs]
t0 = time()
# If simple recv_all, just receiving from all input links
if delay is None:
for dic, link in zip(ret, self.inputs):
dic.update(link.recv_chunk())
# Otherwise, receiving during the given period
else:
while time() - t0 < delay:
last_t = time()
# Updating the list of received values
for dic, link in zip(ret, self.inputs):
data = link.recv_chunk()
for label, values in data.items():
dic[label].extend(values)
# Sleeping to avoid useless CPU usage
sleep(max(0., last_t + poll_delay - time()))
self.log(logging.DEBUG, f"Called recv_all_data_raw, got "
f"{[dict(dic) for dic in ret]}")
return [dict(dic) for dic in ret]