diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e927920aabc20c346f3897a1d21ead66f98977f..0a69cf3753e96939a5a7fd78eccdada96eb1ef2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1 +1,3 @@ -# v2.17.24 +# v2.18 + +* added `satella.parsing` diff --git a/docs/exceptions.rst b/docs/exceptions.rst index 4a41cd157635504882ebae05936ef02022872be8..249c6920f89dd88b519d5ea5250b71392753315e 100644 --- a/docs/exceptions.rst +++ b/docs/exceptions.rst @@ -196,3 +196,10 @@ ProcessFailed .. autoclass:: satella.exceptions.ProcessFailed :members: + + +NotEnoughBytes +~~~~~~~~~~~~~~ + +.. autoclass:: satella.exceptions.NotEnoughBytes + :members: diff --git a/docs/parsing.rst b/docs/parsing.rst new file mode 100644 index 0000000000000000000000000000000000000000..bc1e8da8257029791769af1147c402d1b94b63da --- /dev/null +++ b/docs/parsing.rst @@ -0,0 +1,15 @@ +Utilities for making binary parsers +=================================== + +BinaryParser +~~~~~~~~~~~~ + +.. autoclass:: satella.parsing.BinaryParser + :members: + + +NotEnoughBytes +~~~~~~~~~~~~~~ + +.. autoclass:: satella.exceptions.NotEnoughBytes + :members: diff --git a/satella/__init__.py b/satella/__init__.py index c818f9bc39ad1024d79380125b9f5fd2e44b6dbe..2478e98a66824233432789d757bc948ca956fc8d 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.17.24a1' +__version__ = '2.18' diff --git a/satella/exceptions.py b/satella/exceptions.py index 701d9a5e3b124eb2ca169aab3e68ec6f68395290..9ac1a0ed29af1b90dcba4b1ed22dbe9735a30b5e 100644 --- a/satella/exceptions.py +++ b/satella/exceptions.py @@ -7,7 +7,7 @@ __all__ = ['BaseSatellaError', 'ResourceLockingError', 'ResourceNotLocked', 'Res 'PreconditionError', 'MetricAlreadyExists', 'BaseSatellaException', 'CustomException', 'CodedCustomException', 'CodedCustomExceptionMetaclass', 'WouldWaitMore', 'ProcessFailed', 'AlreadyAllocated', 'Empty', 'ImpossibleError', - 'ConfigurationMisconfiguredError'] + 'ConfigurationMisconfiguredError', 'NotEnoughBytes'] class CustomException(Exception): @@ -237,3 +237,9 @@ class ImpossibleError(BaseException): This is a BaseException, since it should be propagated upwards as soon as possible! """ + + +class NotEnoughBytes(BaseSatellaError): + """ + Not enough bytes in the parser remain to satisfy this request + """ diff --git a/satella/parsing.py b/satella/parsing.py new file mode 100644 index 0000000000000000000000000000000000000000..3fb65820dfcc7b81e4ef30470f10e9bdd1078a1f --- /dev/null +++ b/satella/parsing.py @@ -0,0 +1,99 @@ +import struct +import typing as tp + +from satella.exceptions import NotEnoughBytes + + +class BinaryParser: + """ + A class that allows parsing binary streams easily + + :param b_stream: an object that allows indiced access, and allows subscripts to + span ranges, which will return items parseable by struct + :param offset: initial offset into the stream + :raises NotEnoughBytes: offset larger than stream length + + :ivar offset: offset from which bytes will be readed + """ + def __init__(self, b_stream: tp.Union[bytes, bytearray], offset: int = 0): + self.b_stream = b_stream + self.struct_cache = {} + self.stream_length = len(b_stream) + self.pointer = offset + if offset > len(self.b_stream): + raise NotEnoughBytes('Offset larger than the stream!') + + def _to_struct(self, st: tp.Union[str, struct.Struct]) -> struct.Struct: + if isinstance(st, struct.Struct): + return st + else: + if st in self.struct_cache: + fmt = st + return self.struct_cache[fmt] + else: + fmt = st + st = struct.Struct(st) + self.struct_cache[fmt] = st + return st + + def get_bytes(self, n: int) -> bytes: + """ + Return this many bytes + + :param n: amount of bytes to return + :return: bytes returned + :raises NotEnoughBytes: not enough bytes remain in the stream! + """ + if self.stream_length < self.pointer + n: + raise NotEnoughBytes('Not enough bytes') + try: + return self.b_stream[self.pointer:self.pointer+n] + finally: + self.pointer += n + + def get_struct(self, st: tp.Union[str, struct.Struct]) -> tp.Union[int, float]: + """ + Try to obtain as many bytes as this struct requires and return them parsed. + + This must be a single-character struct! + + :param st: a single-character struct.Struct or a single character struct specification + :return: a value returned from it + :raises NotEnoughBytes: not enough bytes remain in the stream! + :raises AssertionError: struct was not a single element one! + """ + st = self._to_struct(st) + + if st.format[0] in {'>', '<', '!', '@'}: + assert len(st.format) == 2, 'Format must span at most 1 character, use ' \ + 'get_structs for multiples!' + else: + assert len(st.format) == 1, 'Format must span at most 1 character, use ' \ + 'get_structs for multiples!' + + st_len = st.size + if self.stream_length < self.pointer + st_len: + raise NotEnoughBytes('Not enough bytes') + + try: + return st.unpack(self.b_stream[self.pointer:self.pointer+st_len])[0] + finally: + self.pointer += st_len + + def get_structs(self, st: tp.Union[str, struct.Struct]) -> tp.Tuple[tp.Union[int, float], + ...]: + """ + Try to obtain as many bytes as this struct requires and return them parsed. + + :param st: a struct.Struct or a multi character struct specification + :return: a tuple of un-parsed values + :raises NotEnoughBytes: not enough bytes remain in the stream! + """ + st = self._to_struct(st) + st_len = st.size + if self.stream_length < self.pointer + st_len: + raise NotEnoughBytes('Not enough bytes') + try: + return st.unpack(self.b_stream[self.pointer:self.pointer+st_len]) + finally: + self.pointer += st_len diff --git a/tests/test_parsing.py b/tests/test_parsing.py new file mode 100644 index 0000000000000000000000000000000000000000..20da8c159ea5e71c00ed5237b959aac9123802c4 --- /dev/null +++ b/tests/test_parsing.py @@ -0,0 +1,16 @@ +import struct +import unittest + +from satella.exceptions import NotEnoughBytes +from satella.parsing import BinaryParser + + +class TestParsing(unittest.TestCase): + def test_something(self): + bp = BinaryParser(b'\x00\x00\x01\x02\x00\x00\x00\xFF\x00') + self.assertEqual(bp.get_bytes(2), b'\x00\x00') + self.assertEqual(bp.get_structs('BB'), (1, 2)) + self.assertEqual(bp.get_struct(struct.Struct('>L')), 255) + self.assertRaises(NotEnoughBytes, lambda: bp.get_struct('>L')) + self.assertRaises(NotEnoughBytes, lambda: bp.get_bytes(5)) + self.assertRaises(NotEnoughBytes, lambda: BinaryParser(b'', 1))