Source code for crappy.inout.spectrum_m2i4711

# coding: utf-8

import numpy as np
from time import time
from typing import List, Optional, Iterable
import logging
from  warnings import warn

from .meta_inout import InOut
from ..tool.bindings import pyspcm as spc


[docs] class SpectrumM2I4711(InOut): """This class can read data from a Spectrum high speed ADC interfacing over PCIe. It can acquire data over multiple channels, and set for each channel a different voltage range. It is possible to tune the sample rate, the chunk size and the memory allocated to the data buffer. This class can only acquire data by streaming, it cannot acquire single data points. .. versionadded:: 1.4.0 .. versionchanged:: 2.0.0 renamed from *Spectrum* to *SpectrumM2I4711* """
[docs] def __init__(self, channels: Iterable[int], device: str = '/dev/spcm0', ranges: Optional[Iterable[int]] = None, sample_rate: int = 100000, buff_size: int = 2**26, notify_size: int = 2**16) -> None: """Sets the arguments and initializes the parent class. Args: channels: An iterable (like a :obj:`list` or a :obj:`tuple`) of all the channels to read data from, given as :obj:`int`. Refer to the documentation of the Spectrum board to know which combinations of channels are allowed. device: The address of the device to read data from, as a :obj:`str`. ranges: An iterable (like a :obj:`list` or a :obj:`tuple`) indicating for each channel the range of the acquisition in mV, as an :obj:`int`. There should be as many values in this iterable as there are channels. If not given, all ranges default to `10000` mV. sample_rate: The sample rate of the acquisition for all channels, in Hz. The default is 100KHz. .. versionchanged:: 1.5.10 renamed from *samplerate* to *sample_rate* buff_size: The size of the memory allocated as a rolling buffer to copy the data from the card, in bytes. The default is 67MB. notify_size: The size of each chunk of data to copy from the card, in bytes. The default is 65kB. .. versionremoved:: 1.5.10 *split_chan* argument """ warn(f"Starting from version 2.1.0, {type(self).__name__} will be moved " f"to crappy.collection. Your code that uses it will still work as " f"is, except you will now need to import crappy.collection at the " f"top of your script.", FutureWarning) self._spectrum = None super().__init__() # Setting the args self._channels = list(channels) self._device = device.encode() self._ranges = ranges if ranges is not None else [10000 for _ in channels] self._sample_rate = sample_rate self._buff_size = buff_size self._notify_size = notify_size self._chunk_size = notify_size // (2 * len(self._channels)) self.log(logging.INFO, f"Will send {2 * sample_rate * len(self._channels) / notify_size}" f" chunks of {notify_size / 1024} kB per second " f"({sample_rate * len(self._channels) / 512} kB/s)") # These attributes will be set later self._dt = None self._buff = None self._stream_t0 = 0 self._n_points = 0 self._stream_started = False
[docs] def open(self) -> None: """Opens and configures the Spectrum, and sets the ranges and the sample rate as requested.""" self.log(logging.INFO, "Opening the connection to the Spectrum") self._spectrum = spc.hOpen(self._device) # Configuring the Spectrum self.log(logging.INFO, "Configuring the Spectrum") spc.dw_set_param(self._spectrum, spc.SPC_CHENABLE, sum([2 ** chan for chan in self._channels])) spc.dw_set_param(self._spectrum, spc.SPC_CARDMODE, spc.SPC_REC_FIFO_SINGLE) spc.dw_set_param(self._spectrum, spc.SPC_TIMEOUT, 5000) spc.dw_set_param(self._spectrum, spc.SPC_TRIG_ORMASK, spc.SPC_TMASK_SOFTWARE) spc.dw_set_param(self._spectrum, spc.SPC_TRIG_ANDMASK, 0) spc.dw_set_param(self._spectrum, spc.SPC_CLOCKMODE, spc.SPC_CM_INTPLL) # Setting for each channel its range for range_, chan in zip(self._ranges, self._channels): spc.dw_set_param(self._spectrum, spc.SPC_AMP0 + 100 * chan, range_) # Setting the target sample rate and reading the actual sample rate spc.dw_set_param(self._spectrum, spc.SPC_SAMPLERATE, self._sample_rate) spc.dw_set_param(self._spectrum, spc.SPC_CLOCKOUT, 0) real_samplerate = spc.dw_get_param(self._spectrum, spc.SPC_SAMPLERATE) self._dt = 1 / real_samplerate # Creating the buffer for data self._buff = spc.new_buffer(self._buff_size) spc.dw_def_transfer(self._spectrum, spc.SPCM_BUF_DATA, spc.SPCM_DIR_CARDTOPC, self._notify_size, self._buff, 0, self._buff_size)
[docs] def start_stream(self) -> None: """Starts the streams and saves the corresponding timestamp.""" spc.dw_set_param(self._spectrum, spc.SPC_M2CMD, spc.M2CMD_CARD_START | spc.M2CMD_CARD_ENABLETRIGGER | spc.M2CMD_DATA_STARTDMA) self._stream_t0 = time() self._stream_started = True
[docs] def get_stream(self) -> List[np.ndarray]: """Waits for data to be available, and returns it along with an array of timestamps.""" # Generating the array of timestamps start = self._stream_t0 + self._dt * self._n_points t = np.arange(start, start + (self._chunk_size - 1) * self._dt, self._dt) # Waiting until data is available spc.dw_set_param(self._spectrum, spc.SPC_M2CMD, spc.M2CMD_DATA_WAITDMA) pos = spc.dw_get_param(self._spectrum, spc.SPC_DATA_AVAIL_USER_POS) # Getting the data data = np.frombuffer(self._buff, dtype=np.int16, count=self._notify_size // 2, offset=pos).reshape( self._notify_size // (2 * len(self._channels)), len(self._channels)) ret = data.copy() del data # Updating the number of received points spc.dw_set_param(self._spectrum, spc.SPC_DATA_AVAIL_CARD_LEN, self._notify_size) self._n_points += self._chunk_size return [t, ret]
[docs] def stop_stream(self) -> None: """Stops the stream, if it was started.""" if self._stream_started: spc.dw_set_param(self._spectrum, spc.SPC_M2CMD, spc.M2CMD_CARD_STOP | spc.M2CMD_DATA_STOPDMA)
[docs] def close(self) -> None: """Closes the connection to the Spectrum, if it was opened.""" if self._spectrum is not None: self.log(logging.INFO, "closing the connection to the Spectrum") spc.vClose(self._spectrum)