#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Serial interface implementation. This class represents the closer point to the
user. Here the developer needs to complete the interface with the specific
applications commands.
:date: 2021
:author: Christian Wiche
:contact: cwichel@gmail.com
:license: The MIT License (MIT)
"""
# -------------------------------------
import time
import typing as tp
from ..utils.logger import SDK_LOG
from ..utils.serialized import AbstractSerialized
from ..utils.time import Timer
from .stream import Stream
# -->> Tunables <<---------------------
# -->> Definitions <<------------------
#: CallBack definition. AbstractSerialized -> bool
CBSerialized2Bool = tp.Callable[[AbstractSerialized], bool]
# -->> API <<--------------------------
[docs]class Interface:
"""
Serial command interface implementation.
This class should implement all the methods to interact with the target device.
Available events:
#. **on_receive:** This event is emitted when an object is received and
deserialized from the serial device. Subscribe using callbacks with
syntax::
def <callback>(item: AbstractSerialized)
#. **on_connect:** This event is emitted when the system is able to
connect to the device. Subscribe using callbacks with syntax::
def <callback>()
#. **on_reconnect:** This event is emitted when the system is able to
reconnect to the device. Subscribe using callbacks with syntax::
def <callback>()
#. **on_disconnect:** This event is emitted when the system gets
disconnected from the device. Subscribe using callbacks with syntax::
def <callback>()
"""
#: Interface command pull period
PERIOD_PULL_S = 0.005
#: Interface command response timeout
TIMEOUT_RESPONSE_S = 0.5
def __init__(self, stream: Stream) -> None:
"""
Class initialization.
:param Stream stream: Stream used to run the interface.
"""
# Response timeout configuration
self._timeout = self.TIMEOUT_RESPONSE_S
# Initialize stream
self._stream = stream
# Attach stream events
self.on_connect = self._stream.on_connect
self.on_reconnect = self._stream.on_reconnect
self.on_disconnect = self._stream.on_disconnect
self.on_receive = self._stream.on_receive
SDK_LOG.info(f"Interface initialized on: {self._stream.device}")
@property
def stream(self) -> Stream:
"""
Stream handler.
"""
return self._stream
@property
def timeout(self) -> float:
"""
Message response timeout.
"""
return self._timeout
@timeout.setter
def timeout(self, timeout: float) -> None:
"""
Message response timeout setter.
:param float timeout: Timeout in seconds.
:raises ValueError: Timeout value needs to be greater than zero.
"""
if timeout <= 0.0:
raise ValueError("The response timeout needs to be greater than zero.")
self._timeout = timeout
[docs] def transmit(self,
send: AbstractSerialized,
logic: tp.Optional[CBSerialized2Bool] = None,
timeout: float = None) -> tp.Optional[AbstractSerialized]:
"""
Send an item and, if required, wait for a response that complies with
the defined logic.
The response detection logic should have the following syntax::
def <callback>(AbstractSerialized) -> bool:
Where the input is the item received from the device.
:param AbstractSerialized send: Packet to be sent.
:param CBSerialized2Bool logic: Response logic. Defines if a response is detected. If none then only sends.
:param float timeout: Response timeout. By default, the interface setting.
:returns: None if timeout or no response is detected, response item otherwise.
:rtype: Optional[AbstractSerialized]
"""
if logic:
return self._send_recv(send=send, logic=logic, timeout=timeout)
self._send_none(send=send)
return None
[docs] def stop(self) -> None:
"""
Stops the stream process.
"""
self._stream.stop()
[docs] def join(self) -> None:
"""
Maintains the process alive while the interface is working. The loop will
exit if the interface serial process gets stopped.
"""
self._stream.join()
[docs] def _send_none(self,
send: AbstractSerialized) -> None:
"""
Sends an item and dont care about the response.
:param AbstractSerialized send: Packet to be sent.
"""
# Send item and return
SDK_LOG.debug("Sending item...")
self._stream.send(item=send)
SDK_LOG.debug("Response not needed.")
[docs] def _send_recv(self,
send: AbstractSerialized,
logic: CBSerialized2Bool,
timeout: float = None) -> tp.Optional[AbstractSerialized]:
"""
Sends an item and wait for response.
:param AbstractSerialized send: Item to be sent.
:param CBSerialized2Bool logic: Response logic. Defines if a response is detected. If none then only sends.
:param float timeout: Response timeout. By default, the interface setting.
:returns: None if timeout or no response is detected, response item otherwise.
:rtype: Optional[AbstractSerialized]
"""
# Prepare data
recv = None
timeout = self._timeout if (timeout is None) else timeout
# Receive logic callback
def on_received(item: AbstractSerialized) -> None:
nonlocal recv
if logic(item):
recv = item
# Prepare response logic
self.on_receive += on_received
# Prepare and send
SDK_LOG.debug("Sending item...")
self._stream.send(item=send)
# Wait for response
SDK_LOG.debug("Waiting for response...")
tim = Timer()
while not recv and (tim.elapsed() < timeout):
time.sleep(self.PERIOD_PULL_S)
self.on_receive -= on_received
# Check data
state = "Received" if recv else "Timeout"
SDK_LOG.debug(f"Item response: {state} after {tim.elapsed():.03f}[s]")
return recv
# -->> Export <<-----------------------
__all__ = [
"CBSerialized2Bool",
"Interface",
]