embutils.serial.interface module
Serial interface implementation. This class represents the closer point to the user. Here the developer needs to complete the interface with the specific applications commands.
- date
2021
- author
Christian Wiche
- contact
- license
The MIT License (MIT)
- embutils.serial.interface.CBSerialized2Bool
CallBack definition. AbstractSerialized -> bool
alias of
Callable[[embutils.utils.serialized.AbstractSerialized],bool]
- class embutils.serial.interface.Interface(stream: embutils.serial.stream.Stream)[source]
Bases:
objectSerial command interface implementation. This class should implement all the methods to interact with the target device.
Available events:
on_receive: This event is emitted when an object is received and deserialized from the serial device. Subscribe using callbacks with syntax:
def <callback>(item: AbstractSerialized)
on_connect: This event is emitted when the system is able to connect to the device. Subscribe using callbacks with syntax:
def <callback>()
on_reconnect: This event is emitted when the system is able to reconnect to the device. Subscribe using callbacks with syntax:
def <callback>()
on_disconnect: This event is emitted when the system gets disconnected from the device. Subscribe using callbacks with syntax:
def <callback>()
Class initialization.
- Parameters
stream (Stream) – Stream used to run the interface.
- PERIOD_PULL_S = 0.005
Interface command pull period
- TIMEOUT_RESPONSE_S = 0.5
Interface command response timeout
- _send_none(send: embutils.utils.serialized.AbstractSerialized) None[source]
Sends an item and dont care about the response.
- Parameters
send (AbstractSerialized) – Packet to be sent.
- _send_recv(send: embutils.utils.serialized.AbstractSerialized, logic: Callable[[embutils.utils.serialized.AbstractSerialized], bool], timeout: Optional[float] = None) Optional[embutils.utils.serialized.AbstractSerialized][source]
Sends an item and wait for response.
- Parameters
send (AbstractSerialized) – Item to be sent.
logic (CBSerialized2Bool) – Response logic. Defines if a response is detected. If none then only sends.
timeout (float) – Response timeout. By default, the interface setting.
- Returns
None if timeout or no response is detected, response item otherwise.
- Return type
Optional[AbstractSerialized]
- join() None[source]
Maintains the process alive while the interface is working. The loop will exit if the interface serial process gets stopped.
- property stream: embutils.serial.stream.Stream
Stream handler.
- property timeout: float
Message response timeout.
- transmit(send: embutils.utils.serialized.AbstractSerialized, logic: Optional[Callable[[embutils.utils.serialized.AbstractSerialized], bool]] = None, timeout: Optional[float] = None) Optional[embutils.utils.serialized.AbstractSerialized][source]
Send an item and, if required, wait for a response that complies with the defined logic.
The response detection logic should have the following syntax:
def <callback>(AbstractSerialized) -> bool:
Where the input is the item received from the device.
- Parameters
send (AbstractSerialized) – Packet to be sent.
logic (CBSerialized2Bool) – Response logic. Defines if a response is detected. If none then only sends.
timeout (float) – Response timeout. By default, the interface setting.
- Returns
None if timeout or no response is detected, response item otherwise.
- Return type
Optional[AbstractSerialized]