Source code for crappy.inout.sager_sg_gs1700

# coding: utf-8

import time
import logging
from  warnings import warn

from .meta_inout import InOut
from .._global import OptionalModule

try:
  import serial
except (ModuleNotFoundError, ImportError):
  serial = OptionalModule("pyserial")

# Addresses of the registers of the device
CMD_READ = 0x52
CMD_WRITE = 0x43
PARAM_PROGRAM = 0x15
PARAM_SV_MV = 0x1A
STOP_MODE_PARAM = 12


[docs] class SagerSG_GS1700(InOut): """Driver for Sager SG-GS1700 furnace via AIBUS (serial). .. versionadded:: 2.0.9 """
[docs] def __init__(self, port: str, baudrate: int = 9600, timeout: float = 0.8, address: int = 0x0A) -> None: """Sets the arguments and initializes parent class. Args: port: Serial port on which to communicate with the device (e.g. "COM7" on Windows, "/dev/ttyUSB0" on Linux). baudrate: Serial baudrate. timeout: Serial timeout (seconds). address: AIBUS device address. """ warn(f"Starting from version 2.1.0, {type(self).__name__} will be moved " f"to crappy.collection. Your code that uses it will still work as " f"is, except you will now need to import crappy.collection at the " f"top of your script.", FutureWarning) self._ser: serial.Serial | None = None super().__init__() self._compensations_dict = dict() self._port = port self._baudrate = baudrate self._timeout = timeout self._address = address
[docs] def open(self) -> None: """Open the serial connection and put the furnace in hold mode.""" self._ser = serial.Serial(self._port, self._baudrate, bytesize=8, parity=serial.PARITY_NONE, stopbits=1, timeout=self._timeout) self.log(logging.INFO, f"Sager SG-GS1700 connected on {self._port}") self._write_param_u16(PARAM_PROGRAM, 5)
[docs] def get_data(self) -> tuple[float, float, float] | None: """Read PV and SV (in °C).""" now = time.time() # Cannot interpret the read values in that case if len(resp := self._read_generic(0x00)) < 4: return None pv_u16 = (resp[1] << 8) | resp[0] sv_u16 = (resp[3] << 8) | resp[2] return now, pv_u16 / 10.0, sv_u16 / 10.0
[docs] def set_cmd(self, value: float | int) -> None: """Write the temperature setpoint (°C).""" # Put the furnace in hold mode self._write_param_u16(PARAM_PROGRAM, 5) val = int(round(float(value) * 10.0)) & 0xFFFF self._write_param_u16(PARAM_SV_MV, val)
[docs] def close(self) -> None: """Close the serial connection.""" # Only close if the serial connection was first opened if self._ser is None: return # Put the furnace in stop mode self._write_param_u16(STOP_MODE_PARAM, 10) self._ser.close() self.log(logging.INFO, f"Sager SG-GS1700 disconnected from {self._port}")
def _build_frame(self, cmd: int, param: int, value_u16: int) -> bytes: """Build an AIBUS frame from a command and a value.""" val_lo = value_u16 & 0xFF val_hi = (value_u16 >> 8) & 0xFF # Compute checksum ecc = ((param << 8) + cmd + value_u16 + self._address) & 0xFFFF ecc_lo = ecc & 0xFF ecc_hi = (ecc >> 8) & 0xFF return bytes([0x8A, 0x8A, cmd, param, val_lo, val_hi, ecc_lo, ecc_hi]) def _send_and_recv(self, frame: bytes, read_len: int = 12) -> bytes: """Send a frame and read the response.""" self._ser.reset_input_buffer() self._ser.reset_output_buffer() self.log(logging.DEBUG, f"TX: {' '.join(f'{b:02X}' for b in frame)}") self._ser.write(frame) self._ser.flush() time.sleep(0.08) resp = self._ser.read(read_len) self.log(logging.DEBUG, f"RX: {' '.join(f'{b:02X}' for b in resp)}") return resp def _write_param_u16(self, param: int, value: int) -> None: """Write a 16-bit parameter.""" frame = self._build_frame(CMD_WRITE, param, value & 0xFFFF) self._send_and_recv(frame, read_len=8) def _read_generic(self, param: int = 0x00) -> bytes: """Generic read command for a given param.""" frame = self._build_frame(CMD_READ, param, 0x0000) return self._send_and_recv(frame, read_len=12)