Source code for embutils.serial.stream

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Stream implementation.

:date:      2021
:author:    Christian Wiche
:contact:   cwichel@gmail.com
:license:   The MIT License (MIT)
"""
# -------------------------------------

import abc
import time
import typing as tp

from ..utils.events import EventHook
from ..utils.logger import SDK_LOG
from ..utils.threading import SDK_TP, SimpleThreadTask
from ..utils.serialized import AbstractSerialized, AbstractSerializedCodec
from ..utils.service import AbstractService
from .device import Device


# -->> Tunables <<---------------------


# -->> Definitions <<------------------


# -->> API <<--------------------------
[docs]class AbstractSerializedStreamCodec(AbstractSerializedCodec): """ AbstractSerializedCodec variant for stream usage. This class includes the logic required to decode a serialized object from a byte stream. """
[docs] @abc.abstractmethod def decode_stream(self, device: Device) -> tp.Optional[AbstractSerialized]: """ Decodes a serialized object from a byte stream. :param Device device: Stream source. :returns: Deserialized object if able, None otherwise. :rtype: Optional[AbstractSerialized] :raises ConnectionError: Device raised exception while reading. """
[docs]class Stream(AbstractService): """ This class is used to send and receive serialized objects through a serial device in asynchronous way. When a new item is received this class will notify the system using events. Available events: #. **on_receive:** This event is emitted when an object is received and deserialized from the serial device. Subscribe using callbacks with syntax:: def <callback>(item: AbstractSerialized) #. **on_connect:** This event is emitted when the system is able to connect to the device. Subscribe using callbacks with syntax:: def <callback>() #. **on_reconnect:** This event is emitted when the system is able to reconnect to the device. Subscribe using callbacks with syntax:: def <callback>() #. **on_disconnect:** This event is emitted when the system gets disconnected from the device. Subscribe using callbacks with syntax:: def <callback>() """ #: Stream reconnect period RECONNECT_PERIOD_S = 0.5 def __init__(self, device: Device, codec: AbstractSerializedStreamCodec, **kwargs) -> None: """ Class initialization. :param Device device: Serial device handler. :param AbstractSerializedStreamCodec codec: Serialized objects codec handler. """ # Service core super().__init__(**kwargs) self._device = device self._codec = codec # Service events self.on_receive = EventHook() self.on_connect = EventHook() self.on_reconnect = EventHook() self.on_disconnect = EventHook() # Internal event handlers self.on_receive += lambda item: self._transfer_debug(item=item, received=True) def __del__(self) -> None: """ Class destructor. Stops the stream and the associated serial device. """ super().__del__() del self._device @property def codec(self) -> AbstractSerializedStreamCodec: """ Serialized objects codec handler. """ return self._codec @property def device(self) -> Device: """ Serial device handler. """ return self._device @device.setter def device(self, device: Device) -> None: """ Serial device handler setter. .. note:: * This action replaces the current device. * If the new device handler has no port defined the current port is preserved. :param Device device: Serial device handler. """ self.pause() if device.port is not None: self._device = device self.resume()
[docs] def send(self, item: AbstractSerialized) -> None: """ Send a serializable item through the serial device. :param AbstractSerialized item: Item to send. """ if self.is_running: self._transfer_debug(item=item, received=False) self._device.write(data=self._codec.encode(data=item))
[docs] def _task(self) -> None: """ Stream process: #. Tries to decode items. #. If an item is available, emits an event. #. Handle disconnection status. """ # Try to receive data, if failed go to reconnection loop try: item = self._codec.decode_stream(device=self._device) if item: SDK_TP.enqueue(task=SimpleThreadTask( name=f"{self.__class__.__name__}.on_received", task=lambda: self.on_receive.emit(item=item) )) except ConnectionError: # Device disconnected... SDK_LOG.info(f"Device disconnected: {self._device}") SDK_TP.enqueue(task=SimpleThreadTask( name=f"{self.__class__.__name__}.on_disconnect", task=self.on_disconnect.emit )) if self._device_connect(): SDK_TP.enqueue(task=SimpleThreadTask( name=f"{self.__class__.__name__}.on_reconnect", task=self.on_reconnect.emit ))
[docs] def _on_start(self) -> None: """ Ensures device connected on service start. """ self._device_init()
[docs] def _on_resume(self) -> None: """ Ensures device connected on service resume. """ self._device_init(start=False)
[docs] def _on_pause(self) -> None: """ Closes device when paused. """ self._device.close()
[docs] def _on_end(self) -> None: """ Closes device when service gets terminated. """ self._device.close()
[docs] def _device_init(self, start: bool = True) -> None: """ Initializes the serial device. :param bool start: True if used on start handler. False otherwise. """ if not self._device.is_open: connected = self._device_connect() if connected and start: SDK_TP.enqueue(task=SimpleThreadTask( name=f"{self.__class__.__name__}.on_connect", task=self.on_connect.emit ))
[docs] def _device_connect(self) -> bool: """ Perform connection attempts on the serial device while the service is running. :returns: True if connection succeeded, false otherwise. :rtype: bool """ SDK_LOG.info(f"Attempting connection to {self._device}") self._device.close() while self.is_running: if self._device.open(): self._device.flush() SDK_LOG.info(f"Connected to {self._device}") return True SDK_LOG.info(f"Connection attempt on {self._device} failed.") time.sleep(self.RECONNECT_PERIOD_S) return False
[docs] @staticmethod def _transfer_debug(item: AbstractSerialized, received: bool) -> None: """ Print the sent/received items on the logger: :param AbstractSerialized item: Item that is being sent/received. :param bool received: Flag to indicate if we are sending/receiving the item. """ action = "recv" if received else "sent" SDK_LOG.debug(f"Item {action}: {item}")
# -->> Export <<----------------------- __all__ = [ "AbstractSerializedStreamCodec", "Stream", ]