embutils.serial.interface module

Serial interface implementation. This class represents the closer point to the user. Here the developer needs to complete the interface with the specific applications commands.

date

2021

author

Christian Wiche

contact

cwichel@gmail.com

license

The MIT License (MIT)

embutils.serial.interface.CBSerialized2Bool

CallBack definition. AbstractSerialized -> bool

alias of Callable[[embutils.utils.serialized.AbstractSerialized], bool]

class embutils.serial.interface.Interface(stream: embutils.serial.stream.Stream)[source]

Bases: object

Serial command interface implementation. This class should implement all the methods to interact with the target device.

Available events:

  1. on_receive: This event is emitted when an object is received and deserialized from the serial device. Subscribe using callbacks with syntax:

    def <callback>(item: AbstractSerialized)
    
  2. on_connect: This event is emitted when the system is able to connect to the device. Subscribe using callbacks with syntax:

    def <callback>()
    
  3. on_reconnect: This event is emitted when the system is able to reconnect to the device. Subscribe using callbacks with syntax:

    def <callback>()
    
  4. on_disconnect: This event is emitted when the system gets disconnected from the device. Subscribe using callbacks with syntax:

    def <callback>()
    

Class initialization.

Parameters

stream (Stream) – Stream used to run the interface.

PERIOD_PULL_S = 0.005

Interface command pull period

TIMEOUT_RESPONSE_S = 0.5

Interface command response timeout

_send_none(send: embutils.utils.serialized.AbstractSerialized) None[source]

Sends an item and dont care about the response.

Parameters

send (AbstractSerialized) – Packet to be sent.

_send_recv(send: embutils.utils.serialized.AbstractSerialized, logic: Callable[[embutils.utils.serialized.AbstractSerialized], bool], timeout: Optional[float] = None) Optional[embutils.utils.serialized.AbstractSerialized][source]

Sends an item and wait for response.

Parameters
  • send (AbstractSerialized) – Item to be sent.

  • logic (CBSerialized2Bool) – Response logic. Defines if a response is detected. If none then only sends.

  • timeout (float) – Response timeout. By default, the interface setting.

Returns

None if timeout or no response is detected, response item otherwise.

Return type

Optional[AbstractSerialized]

join() None[source]

Maintains the process alive while the interface is working. The loop will exit if the interface serial process gets stopped.

stop() None[source]

Stops the stream process.

property stream: embutils.serial.stream.Stream

Stream handler.

property timeout: float

Message response timeout.

transmit(send: embutils.utils.serialized.AbstractSerialized, logic: Optional[Callable[[embutils.utils.serialized.AbstractSerialized], bool]] = None, timeout: Optional[float] = None) Optional[embutils.utils.serialized.AbstractSerialized][source]

Send an item and, if required, wait for a response that complies with the defined logic.

The response detection logic should have the following syntax:

def <callback>(AbstractSerialized) -> bool:

Where the input is the item received from the device.

Parameters
  • send (AbstractSerialized) – Packet to be sent.

  • logic (CBSerialized2Bool) – Response logic. Defines if a response is detected. If none then only sends.

  • timeout (float) – Response timeout. By default, the interface setting.

Returns

None if timeout or no response is detected, response item otherwise.

Return type

Optional[AbstractSerialized]