Source code for embutils.utils.stream

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Stream utilities.

:date:      2021
:author:    Christian Wiche
:contact:   cwichel@gmail.com
:license:   The MIT License (MIT)
"""
# -------------------------------------

import contextlib as ctx
import io
import queue
import threading as th
import typing as tp

from .common import ENCODE
from .threading import SDK_TP, SimpleThreadTask


# -->> Tunables <<---------------------


# -->> Definitions <<------------------


# -->> API <<--------------------------
[docs]@ctx.contextmanager def unclosable(file: io.IOBase) -> tp.Iterator[io.IOBase]: """ Makes file unclosable during the context execution. :param io.IOBase file: File to protect during context. """ close = getattr(file, "close") try: # Passthrough close function and use file setattr(file, "close", lambda: None) yield file finally: # Restore close function setattr(file, "close", close)
[docs]class StreamRedirect: """ Stream redirect utility implementation. Allows storing and redirect a given stream. """ def __init__(self, name: str, stream_in: tp.IO[tp.AnyStr], stream_out: tp.IO[tp.AnyStr]) -> None: """ Class initialization. :param str name: Stream redirection name (used for naming threads). :param IO[AnyStr] stream_in: Input stream. Will be stored on buffer and redirected to output. :param IO[AnyStr] stream_out: Output stream. """ # Configure redirection operation self._src = stream_in self._out = stream_out self._queue = queue.Queue() self._buff = [] self._ready = th.Event() # Start SDK_TP.enqueue(SimpleThreadTask(name=f"{self.__class__.__name__}_{name}_read", task=self._read)) SDK_TP.enqueue(SimpleThreadTask(name=f"{self.__class__.__name__}_{name}_write", task=self._write)) @property def buffer(self) -> str: """ Stream buffer. """ return "".join(self._buff)
[docs] def join(self) -> None: """ Waits until the stream is finished to continue. """ self._ready.wait()
[docs] def _write(self) -> None: """ Writes every line into the output stream and buffer. """ for line in iter(self._queue.get, None): self._buff.append(line) self._out.write(line) self._ready.set()
[docs] def _read(self) -> None: """ Parses and copies every line of the input stream. """ for line in iter(self._src.readline, b""): self._queue.put(line.decode(encoding=ENCODE, errors="ignore")) self._queue.put(None) self._src.close()
# -->> Export <<----------------------- __all__ = [ "unclosable", "StreamRedirect", ]