#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Stream utilities.
:date: 2021
:author: Christian Wiche
:contact: cwichel@gmail.com
:license: The MIT License (MIT)
"""
# -------------------------------------
import contextlib as ctx
import io
import queue
import threading as th
import typing as tp
from .common import ENCODE
from .threading import SDK_TP, SimpleThreadTask
# -->> Tunables <<---------------------
# -->> Definitions <<------------------
# -->> API <<--------------------------
[docs]@ctx.contextmanager
def unclosable(file: io.IOBase) -> tp.Iterator[io.IOBase]:
"""
Makes file unclosable during the context execution.
:param io.IOBase file: File to protect during context.
"""
close = getattr(file, "close")
try:
# Passthrough close function and use file
setattr(file, "close", lambda: None)
yield file
finally:
# Restore close function
setattr(file, "close", close)
[docs]class StreamRedirect:
"""
Stream redirect utility implementation.
Allows storing and redirect a given stream.
"""
def __init__(self, name: str, stream_in: tp.IO[tp.AnyStr], stream_out: tp.IO[tp.AnyStr]) -> None:
"""
Class initialization.
:param str name: Stream redirection name (used for naming threads).
:param IO[AnyStr] stream_in: Input stream. Will be stored on buffer and redirected to output.
:param IO[AnyStr] stream_out: Output stream.
"""
# Configure redirection operation
self._src = stream_in
self._out = stream_out
self._queue = queue.Queue()
self._buff = []
self._ready = th.Event()
# Start
SDK_TP.enqueue(SimpleThreadTask(name=f"{self.__class__.__name__}_{name}_read", task=self._read))
SDK_TP.enqueue(SimpleThreadTask(name=f"{self.__class__.__name__}_{name}_write", task=self._write))
@property
def buffer(self) -> str:
"""
Stream buffer.
"""
return "".join(self._buff)
[docs] def join(self) -> None:
"""
Waits until the stream is finished to continue.
"""
self._ready.wait()
[docs] def _write(self) -> None:
"""
Writes every line into the output stream and buffer.
"""
for line in iter(self._queue.get, None):
self._buff.append(line)
self._out.write(line)
self._ready.set()
[docs] def _read(self) -> None:
"""
Parses and copies every line of the input stream.
"""
for line in iter(self._src.readline, b""):
self._queue.put(line.decode(encoding=ENCODE, errors="ignore"))
self._queue.put(None)
self._src.close()
# -->> Export <<-----------------------
__all__ = [
"unclosable",
"StreamRedirect",
]