Block

class crappy.blocks.block.Block(*args, **kwargs)[source]

This represent a Crappy block, it must be parent of all the blocks.

add_input(in_: Link) None[source]

Adds a Link as an input.

add_output(out: Link) None[source]

Adds a Link as an output.

classmethod all_are(s: str) bool[source]

Returns True only if all processes status are s.

begin() None[source]

If main() is not overridden, this method will be called first, before entering the main loop.

drop(num: list | str | None = None) None[source]

Will clear the inputs of the blocks.

This method performs like get_last(), but returns None instantly.

finish() None[source]

If main() is not overridden, this method will be called upon exit or after a crash.

get_all_last(blocking: bool = True) Dict[str, list][source]

To get the data from all links of the block.

It is almost the same as get_last(), but will return all the data that goes through the links (as list).

Also, if multiple links have the same label, only the last link’s value will be kept.

Parameters:

blocking – If True, blocks until there’s at least one value received for each label.

get_last(blocking: bool = True) Dict[str, Any][source]

To get the latest value of each labels from all inputs.

Warning

Unlike the recv methods of Link, this method is NOT guaranteed to return all the data going through the links!

It is meant to get the latest values, discarding all the previous ones (for a displayer for example).

Its mode of operation is completely different since it can operate on multiple inputs at once.

Note

The first call may be blocking until it receives data, all the others will return instantaneously, giving the latest known reading.

handle_freq() None[source]

For block with a given number of loops/s (use freq attribute to set it).

launch(t0: float) None[source]

To start the main() method, will call Process.start() if needed.

Once the process is started, calling launch will set the starting time and actually start the main method.

Parameters:

t0 (float) – Time to set as starting time of the block (mandatory, in seconds after epoch).

main() None[source]

This is where you define the main loop of the block.

Important

If not overridden, will raise an error.

poll() bool[source]

Tells if any input link has pending data

Returns True if l.poll() is True for any input link l.

prepare() None[source]

This will be run when creating the process, but before the actual start.

The first code to be run in the new process, will only be called once and before the actual start of the main launch of the blocks.

It can remain empty and do nothing.

classmethod prepare_all(verbose: bool = True) None[source]

Starts all the blocks processes (block.prepare), but not the main loop.

recv_all() Dict[str, list][source]

Receives new data from all the inputs (not as chunks).

It will simply call Pipe.recv() on all non empty links and return a single dict.

Important

If the same label comes from multiple links, it may be overridden !

recv_all_delay(delay: float | None = None, poll_delay: float = 0.1) List[Dict[str, list]][source]

Method to wait for data, but continuously reading all the links to make sure they do not saturate.

Parameters:
  • delay – The method only returns after this delay (in seconds). If None or 0, it returns after polling the pipes just once.

  • poll_delay – The delay (in seconds) between two pipe polls. It is safer to keep it lower than 0.1s. Not used when delay is None or 0.

Returns:

A list where each entry is what would have been returned by Link.recv_chunk() on each link.

recv_all_last() Dict[str, float][source]

Like recv_all, but drops older data to return only the latest value

This method avoids Pipe congestion that can be induced by recv_all when the receiving block is slower than the block upstream

classmethod renice_all(high_prio: bool = True, **_) None[source]

Will renice all the blocks processes according to block.niceness value.

Note

If high_prio is False, blocks with a negative niceness value will be ignored.

This is to avoid asking for the sudo password since only root can lower the niceness of processes.

run() None[source]

Method to be run in sub-process; can be overridden in sub-class

send(data: Dict[str, float] | List[float]) None[source]

To send the data to all blocks downstream.

Send has 2 ways to operate. You can either build the dict yourself, or you can define self.labels (usually time first) and call send with a list. It will then map them to the dict.

Note

ONLY dict can go through links.

property status: str

Returns the status of the block, from the process itself or the parent.

It can be:

  • “idle”: Block not started yet.

  • “initializing”: start() was called and prepare() is not over yet.

  • “ready”: prepare() is over, waiting to start main() by calling launch().

  • “running”: main() is running.

  • “done”: main() is over.

  • “error”: An error occurred and the block stopped.

classmethod stop_all(verbose: bool = True) None[source]

Stops all the blocks (crappy.stop).

crappy.blocks.block.renice(pid: int, niceness: int) None[source]

Function to renice a process.

Warning

Only works in Linux.

Note

The user must be allowed to use sudo to renice with a negative value. May thus ask for a password for negative values.