Source code for embutils.utils.cobs

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
COBS encoding/decoding implementation.

:date:      2021
:author:    Christian Wiche
:contact:   cwichel@gmail.com
:license:   The MIT License (MIT)
"""
# -------------------------------------

import typing as tp

import attr


# -->> Tunables <<---------------------


# -->> Definitions <<------------------


# -->> API <<--------------------------
[docs]class COBS: """ Consistent Overhead Byte Stuffing (COBS) encoding/decoding utilities. """
[docs] class DecodeException(Exception): """ COBS decoding exception. """
[docs] @attr.define class Block: """ COBS encoding block. """ #: Block code code: int = attr.field(converter=int) #: Block data array data: bytearray = attr.field(converter=bytearray) #: Flag. Defines if is required to add a zero at the end. zero: bool = attr.field(converter=bool, default=False)
[docs] @staticmethod def encode(data: bytearray = bytearray()) -> bytearray: """ Encode a byte array using Consistent Overhead Byte Stuffing (COBS). .. note:: * Encoding guarantees non-zero bytes on the output array (besides the terminator). * An empty string is encoded to [0x01, 0x00] :param bytearray data: Bytes to be encoded. :returns: Encoded byte array. :rtype: bytearray """ # Prepare blocks: tp.List[COBS.Block] = [] base = data.copy() base.append(0x00) # Encode while base: idx = base.find(0x00) code = (idx + 1) if (idx != -1) else len(base) if code > 0xFE: # Extended block: blocks.append(COBS.Block(code=0xFF, data=base[0:0xFE])) del base[0:0xFE] else: # Block zero terminated: blocks.append(COBS.Block(code=code, data=base[0:(code - 1)])) del base[0:code] # Process encoding out = bytearray() for block in blocks: out.append(block.code) out.extend(block.data) out.append(0x00) return out
[docs] @staticmethod def decode(data: bytearray) -> bytearray: """ Decodes a byte array that was encoded using Consistent Overhead Byte Stuffing (COBS). :param bytearray data: Bytes to be decoded. :returns: Decoded byte array. :rtype: bytearray :raises COBS.DecodeException: Encoded data is invalid. """ # Prepare blocks: tp.List[COBS.Block] = [] base = data.copy() # Decode while base: # Check if the termination byte is received code = base[0] if code == 0x00: break # Add zero to last block, if needed if blocks: blocks[-1].zero = (blocks[-1].code != 0xFF) # Get block and remove bytes block = COBS.Block(code=code, data=base[1:code]) del base[0:code] # Check block if block.data.find(0x00) != -1: raise COBS.DecodeException("Zero byte found in input!") if len(block.data) != (code - 1): raise COBS.DecodeException("Not enough bytes to process!") # Append block blocks.append(block) # Process decoding out = bytearray() for block in blocks: out.extend(block.data) if block.zero: out.append(0x00) return out
# -->> Export <<----------------------- __all__ = [ "COBS", ]