Source code for embutils.serial.device

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Serial device implementation classes.

:date:      2021
:author:    Christian Wiche
:contact:   cwichel@gmail.com
:license:   The MIT License (MIT)
"""
# -------------------------------------

import threading as th
import typing as tp

import serial
import serial.tools.list_ports

from ..utils.events import EventHook
from ..utils.enum import IntEnum
from ..utils.service import AbstractService
from ..utils.threading import SDK_TP, SimpleThreadTask, sync
from ..utils.time import Timer


# -->> Tunables <<---------------------


# -->> Definitions <<------------------


# -->> API <<--------------------------
[docs]class Device: """ Serial device implementation wrapper. This class includes the USB ID information and allows easily use of looped serial ports for testing. """ #: Default device ID DEF_ID = 0xDEADBEEF #: Default device settings DEF_SETTINGS = { "baudrate": 115200, "bytesize": 8, "stopbits": 1, "parity": "N", "timeout": 0.1 } def __init__(self, port: str = None, looped: bool = False, settings: dict = None) -> None: """ Device configuration. Applies the serial device settings to the selected port. :param str port: Port name. :param bool looped: Enables the test mode (looped serial). :param dict settings: Serial device configuration. :raises ValueError: Port is not provided, or it doesnt exist. """ # Device core self._lock = th.RLock() self._looped = looped if not isinstance(settings, dict): settings = self.DEF_SETTINGS # Create serial self._looped = looped if looped: self._serial = serial.serial_for_url(url="loop://", do_not_open=True, exclusive=True) self._id = self.DEF_ID else: # Check port if port is None: raise ValueError("Port is required!") # Get ID _id = self._id_from_port(port=port) if _id is None: raise ValueError(f"Port {port} is not connected!") # Initialize self._serial = serial.Serial() self._serial.port = port self._id = _id # Configure serial self._serial.apply_settings(d=settings) def __repr__(self) -> str: """ Representation string. """ return f"{self.__class__.__name__}(port={self.port}, id=0x{self.id:08X}, looped={self._looped})" def __eq__(self, other: object) -> bool: """ Check if the object is equal to the input. """ if isinstance(other, self.__class__): return (self.port == other.port) and (self.id == other.id) return False def __ne__(self, other: object): """ Check if the object is different to the input. """ return not self.__eq__(other) @property def port(self) -> str: """ Device port name. """ return self._serial.port @property def id(self) -> int: """ Device USB ID. """ return self._id @property def serial(self) -> serial.Serial: """ Serial handler. """ return self._serial @property def is_open(self) -> bool: """ Returns if the serial device is open. """ return self._serial.is_open
[docs] @sync(lock_name="_lock") def open(self) -> bool: """ Tries to open the serial port. :returns: True if open, false otherwise. :rtype: bool """ try: # Check if port is already open if self.is_open: return True # Try to open it otherwise self._serial.open() return True except serial.SerialException: return False
[docs] @sync(lock_name="_lock") def close(self) -> None: """ Closes the serial port. """ if self.is_open: self.flush() self._serial.close()
[docs] @sync(lock_name="_lock") def flush(self) -> None: """ Flushes the serial buffer. """ if self.is_open: self._serial.flush()
[docs] @sync(lock_name="_lock") def write(self, data: bytearray) -> int: """ Writes the given data through the serial port. :param bytearray data: Bytes to be sent through the serial port. :returns: Number of bytes sent. :rtype: int """ if self.is_open: return self._serial.write(data=data) return 0
[docs] @sync(lock_name="_lock") def read(self, size: int = 1) -> tp.Optional[bytearray]: """ Reads a fixed number of bytes from the serial buffer. The process is stopped with error if a timeout is reached before completion. :param int size: Number of bytes to read. :returns: None if empty or disconnected. Bytearray if bytes received. :rtype: Optional[bytearray] """ try: if self.is_open: return self._serial.read(size=size) return None except serial.SerialException: return None
[docs] @sync(lock_name="_lock") def read_until(self, expected: bytes = b"\n", size: int = None) -> tp.Optional[bytearray]: """ Reads bytes from the serial buffer until the expected sequence is found, the received bytes exceed the specified limit, or a timeout is reached. :param bytes expected: Stop read condition. :param int size: Read array size limit. :returns: None if empty or disconnected. Bytearray if bytes received. :rtype: Optional[bytearray] """ try: if self.is_open: return self._serial.read_until(expected=expected, size=size) return None except serial.SerialException: return None
[docs] @staticmethod def _id_from_port(port: str) -> tp.Optional[int]: """ Retrieves the USB ID for the given port. :param str port: Port name. :returns: USB ID :rtype: int """ target = None devices = serial.tools.list_ports.comports() for dev in devices: target = dev if (dev.device == port) else target return ((target.vid << 16) | target.pid) if target else None
[docs]class DeviceList(tp.List[Device]): """ Serial device list implementation. This class define mechanisms to scan, compare and filter lists of devices. """
[docs] def compare(self, other: "DeviceList") -> "DeviceList": """ Get the differences between two serial device lists. :param DeviceList other: List to compare with. :returns: List containing the differences. :rtype: DeviceList """ # Handle no difference if self == other: return DeviceList() # Perform comparison aux = len(self) > len(other) diff = DeviceList() base = self if aux else other comp = other if aux else self for dev in base: if dev not in comp: diff.append(dev) return diff
[docs] def filter(self, port: str = None, dev_id: int = None) -> "DeviceList": """ Filter serial devices from a given list. :param str port: Port name to be filtered. :param int dev_id: Device ID to be filtered. :returns: List containing the filtered devices. :rtype: DeviceList """ dev_list = self # Filter by port if port is not None: dev_list = DeviceList([dev for dev in dev_list if dev.port == port]) # Filter by ID if dev_id is not None: dev_list = DeviceList([dev for dev in dev_list if dev.id == dev_id]) return dev_list
[docs] @staticmethod def scan() -> "DeviceList": """ Scan the system and return a list with the connected serial devices. :returns: List with devices. :rtype: DeviceList """ dev_list = DeviceList() # Get devices dev_scan = serial.tools.list_ports.comports() for dev in dev_scan: # Not consider items without ID if dev.vid is None or dev.pid is None: continue # Create device try: new = Device(port=dev.device) dev_list.append(new) except ValueError: continue return dev_list
[docs]class DeviceScanner(AbstractService): """ Serial device scanner implementation. This class define a thread that allows to check periodically for changes on the connected serial devices. Available events: #. **on_scan_period:** This event is emitted after the scan period is completed. Subscribe using callbacks with syntax:: def <callback>() #. **on_list_change:** This event is emitted when a change is detected on the connected device list. Subscribe using callbacks with syntax:: def <callback>(event: SerialDeviceScanner.Event, changes: SerialDeviceList) """
[docs] class Event(IntEnum): """ Serial device scanner event definitions. """ SD_NO_EVENT = 0x00 # No event SD_LIST_CHANGED = 0x01 # Serial devices list has changed SD_PLUGGED_SINGLE = 0x02 # A single device was plugged SD_PLUGGED_MULTI = 0x03 # Multiple devices were plugged SD_REMOVED_SINGLE = 0x04 # A single device was unplugged SD_REMOVED_MULTI = 0x05 # Multiple devices were unplugged
[docs] @staticmethod def get_event(old: DeviceList, new: DeviceList) -> tp.Tuple["DeviceScanner.Event", DeviceList]: """ Compares two serial device lists and return the differences. :param DeviceList old: Last scanned device list. :param DeviceList new: New scanned device list. :returns: Serial device scanner event and device difference list. :rtype: Tuple[DeviceScanner.Event, DeviceList] """ # Get difference list and define event change = old.compare(other=new) event = DeviceScanner.Event if not change: event = event.SD_NO_EVENT elif len(new) > len(old): event = event.SD_PLUGGED_MULTI if (len(change) > 1) else event.SD_PLUGGED_SINGLE elif len(old) > len(new): event = event.SD_REMOVED_MULTI if (len(change) > 1) else event.SD_REMOVED_SINGLE else: event = event.SD_LIST_CHANGED return event, change
#: Task execution period. TASK_PERIOD_S = 0.5 def __init__(self, period: float = TASK_PERIOD_S, **kwargs) -> None: """ Class initialization. :param float period: Define the periodicity of the scanner executions in seconds. """ # Service core super().__init__(**kwargs) self._timer = Timer() self._period = period self._devices = DeviceList() # Devices connected self._changes = DeviceList() # List of devices that triggered the event (changes) # Service events self.on_scan_period = EventHook() self.on_list_change = EventHook() @property def devices(self) -> DeviceList: """ Connected serial devices list. """ return self._devices @property def period(self) -> float: """ Scanner period in seconds. """ return self._period @period.setter def period(self, value: float) -> None: """ Scanner period setter. :param float value: Period in seconds. """ self._period = value
[docs] def _task(self) -> None: """ Serial Scanner process: #. Read the connected devices. #. Check for changes on the connected devices. #. Raise change event if required, updates current device list. """ if self._timer.elapsed() > self._period: self._timer.start() self._scan()
[docs] def _on_start(self) -> None: """ Run the first scan and start timer. """ self._devices = DeviceList.scan() self._timer.start() self._scan()
[docs] def _scan(self) -> None: """ Scanner functionality: #. Read the connected devices. #. Check for changes on the connected devices. #. Raise change event if required, updates current device list. """ # Get the connected devices devices = DeviceList.scan() # If the change callback has items, inform the differences if not self.on_list_change.empty: event, changes = self.Event.get_event(old=self._changes, new=devices) if event != self.Event.SD_NO_EVENT: SDK_TP.enqueue(task=SimpleThreadTask( name=f"{self.__class__.__name__}.on_list_change", task=lambda: self.on_list_change.emit(event=event, changes=changes) )) self._changes = devices # Update the connected devices list self._devices = devices SDK_TP.enqueue(task=SimpleThreadTask( name=f"{self.__class__.__name__}.on_scan_period", task=self.on_scan_period.emit ))
# -->> Export <<----------------------- __all__ = [ "Device", "DeviceList", "DeviceScanner", ]