#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Service implementation.
:date: 2021
:author: Christian Wiche
:contact: cwichel@gmail.com
:license: The MIT License (MIT)
"""
# -------------------------------------
import abc
import threading as th
import time
from .logger import SDK_LOG
from .threading import SDK_TP, SimpleThreadTask
# -->> Tunables <<---------------------
# -->> Definitions <<------------------
# -->> API <<--------------------------
[docs]class AbstractService(abc.ABC):
"""
Service definition abstraction.
Use this class to simplify the service definition and handling.
"""
#: Task execution delay.
TASK_DELAY_S = 0.001
def __init__(self, delay: float = TASK_DELAY_S) -> None:
"""
Class initialization.
:param float delay: Time between service task executions.
"""
# Configure service operation
self._delay = delay
self._alive = True
self._resumed = False
self._ended = th.Event() # Set when the service gets terminated.
self._running = th.Event() # Set when the service is running.
self._executed = th.Event() # Set after every main task execution.
# Start
self._running.set()
SDK_TP.enqueue(task=SimpleThreadTask(
name=f"{self.__class__.__name__}",
task=self._service
))
def __del__(self):
"""
Class destructor.
Ensures to stop the service correctly on deletion.
"""
self.stop()
self.join()
@property
def is_alive(self) -> bool:
"""
Returns if the service is alive.
"""
return self._alive
@property
def is_running(self) -> bool:
"""
Returns if the service is running.
"""
return self._alive and self._running.is_set()
@property
def delay(self) -> float:
"""
Service main task execution delay in seconds.
"""
return self._delay
@delay.setter
def delay(self, value: float) -> None:
"""
Service main task execution delay setter.
:param float value: Delay in seconds.
"""
self._delay = value
[docs] def resume(self) -> None:
"""
Resumes the service execution.
"""
if not self._running.is_set():
self._resumed = True
self._running.set()
SDK_LOG.info(f"{self.__class__.__name__} resumed")
[docs] def pause(self) -> None:
"""
Pauses the service execution.
"""
if self._running.is_set():
self._executed.wait()
self._running.clear()
self._on_pause()
SDK_LOG.info(f"{self.__class__.__name__} paused")
[docs] def stop(self) -> None:
"""
Stops the service execution.
"""
self._alive = False
self._resumed = False
self._running.set()
[docs] def join(self) -> None:
"""
Wait until service is fully terminated.
"""
while not self._ended.is_set():
time.sleep(self.TASK_DELAY_S)
[docs] def _service(self) -> None:
"""
Service process execution.
#. Runs the starting code.
#. Run the execution loop until the service stop is requested.
#. Runs the ending code and return.
"""
# Start service execution
SDK_LOG.info(f"{self.__class__.__name__} started")
self._on_start()
# Execution loop
while self._alive:
# Handle pause / resume
self._running.wait()
if self._resumed:
self._on_resume()
self._resumed = False
# Handle termination
if not self._alive:
break
# Run main task
try:
self._executed.clear()
self._task()
finally:
self._executed.set()
time.sleep(self._delay)
# Run service ending code
self._on_end()
self._ended.set()
SDK_LOG.info(f"{self.__class__.__name__} ended")
[docs] @abc.abstractmethod
def _task(self) -> None:
"""
Service core task/logic.
"""
[docs] def _on_start(self) -> None:
"""
Optional. Called on service start.
"""
[docs] def _on_resume(self) -> None:
"""
Optional. Called on service resume.
"""
[docs] def _on_pause(self) -> None:
"""
Optional. Called on service pause.
"""
[docs] def _on_end(self) -> None:
"""
Optional. Called upon service termination.
"""
# -->> Export <<-----------------------
__all__ = [
"AbstractService",
]