From 938a7b3148e983f5519db7151e680a710e76743a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <pmaslanka@smok.co> Date: Thu, 14 Oct 2021 19:32:33 +0200 Subject: [PATCH] add parsing --- CHANGELOG.md | 4 +- docs/exceptions.rst | 7 +++ docs/parsing.rst | 15 +++++++ satella/__init__.py | 2 +- satella/exceptions.py | 8 +++- satella/parsing.py | 99 +++++++++++++++++++++++++++++++++++++++++++ tests/test_parsing.py | 16 +++++++ 7 files changed, 148 insertions(+), 3 deletions(-) create mode 100644 docs/parsing.rst create mode 100644 satella/parsing.py create mode 100644 tests/test_parsing.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e927920..0a69cf37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1 +1,3 @@ -# v2.17.24 +# v2.18 + +* added `satella.parsing` diff --git a/docs/exceptions.rst b/docs/exceptions.rst index 4a41cd15..249c6920 100644 --- a/docs/exceptions.rst +++ b/docs/exceptions.rst @@ -196,3 +196,10 @@ ProcessFailed .. autoclass:: satella.exceptions.ProcessFailed :members: + + +NotEnoughBytes +~~~~~~~~~~~~~~ + +.. autoclass:: satella.exceptions.NotEnoughBytes + :members: diff --git a/docs/parsing.rst b/docs/parsing.rst new file mode 100644 index 00000000..bc1e8da8 --- /dev/null +++ b/docs/parsing.rst @@ -0,0 +1,15 @@ +Utilities for making binary parsers +=================================== + +BinaryParser +~~~~~~~~~~~~ + +.. autoclass:: satella.parsing.BinaryParser + :members: + + +NotEnoughBytes +~~~~~~~~~~~~~~ + +.. autoclass:: satella.exceptions.NotEnoughBytes + :members: diff --git a/satella/__init__.py b/satella/__init__.py index c818f9bc..2478e98a 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.17.24a1' +__version__ = '2.18' diff --git a/satella/exceptions.py b/satella/exceptions.py index 701d9a5e..9ac1a0ed 100644 --- a/satella/exceptions.py +++ b/satella/exceptions.py @@ -7,7 +7,7 @@ __all__ = ['BaseSatellaError', 'ResourceLockingError', 'ResourceNotLocked', 'Res 'PreconditionError', 'MetricAlreadyExists', 'BaseSatellaException', 'CustomException', 'CodedCustomException', 'CodedCustomExceptionMetaclass', 'WouldWaitMore', 'ProcessFailed', 'AlreadyAllocated', 'Empty', 'ImpossibleError', - 'ConfigurationMisconfiguredError'] + 'ConfigurationMisconfiguredError', 'NotEnoughBytes'] class CustomException(Exception): @@ -237,3 +237,9 @@ class ImpossibleError(BaseException): This is a BaseException, since it should be propagated upwards as soon as possible! """ + + +class NotEnoughBytes(BaseSatellaError): + """ + Not enough bytes in the parser remain to satisfy this request + """ diff --git a/satella/parsing.py b/satella/parsing.py new file mode 100644 index 00000000..3fb65820 --- /dev/null +++ b/satella/parsing.py @@ -0,0 +1,99 @@ +import struct +import typing as tp + +from satella.exceptions import NotEnoughBytes + + +class BinaryParser: + """ + A class that allows parsing binary streams easily + + :param b_stream: an object that allows indiced access, and allows subscripts to + span ranges, which will return items parseable by struct + :param offset: initial offset into the stream + :raises NotEnoughBytes: offset larger than stream length + + :ivar offset: offset from which bytes will be readed + """ + def __init__(self, b_stream: tp.Union[bytes, bytearray], offset: int = 0): + self.b_stream = b_stream + self.struct_cache = {} + self.stream_length = len(b_stream) + self.pointer = offset + if offset > len(self.b_stream): + raise NotEnoughBytes('Offset larger than the stream!') + + def _to_struct(self, st: tp.Union[str, struct.Struct]) -> struct.Struct: + if isinstance(st, struct.Struct): + return st + else: + if st in self.struct_cache: + fmt = st + return self.struct_cache[fmt] + else: + fmt = st + st = struct.Struct(st) + self.struct_cache[fmt] = st + return st + + def get_bytes(self, n: int) -> bytes: + """ + Return this many bytes + + :param n: amount of bytes to return + :return: bytes returned + :raises NotEnoughBytes: not enough bytes remain in the stream! + """ + if self.stream_length < self.pointer + n: + raise NotEnoughBytes('Not enough bytes') + try: + return self.b_stream[self.pointer:self.pointer+n] + finally: + self.pointer += n + + def get_struct(self, st: tp.Union[str, struct.Struct]) -> tp.Union[int, float]: + """ + Try to obtain as many bytes as this struct requires and return them parsed. + + This must be a single-character struct! + + :param st: a single-character struct.Struct or a single character struct specification + :return: a value returned from it + :raises NotEnoughBytes: not enough bytes remain in the stream! + :raises AssertionError: struct was not a single element one! + """ + st = self._to_struct(st) + + if st.format[0] in {'>', '<', '!', '@'}: + assert len(st.format) == 2, 'Format must span at most 1 character, use ' \ + 'get_structs for multiples!' + else: + assert len(st.format) == 1, 'Format must span at most 1 character, use ' \ + 'get_structs for multiples!' + + st_len = st.size + if self.stream_length < self.pointer + st_len: + raise NotEnoughBytes('Not enough bytes') + + try: + return st.unpack(self.b_stream[self.pointer:self.pointer+st_len])[0] + finally: + self.pointer += st_len + + def get_structs(self, st: tp.Union[str, struct.Struct]) -> tp.Tuple[tp.Union[int, float], + ...]: + """ + Try to obtain as many bytes as this struct requires and return them parsed. + + :param st: a struct.Struct or a multi character struct specification + :return: a tuple of un-parsed values + :raises NotEnoughBytes: not enough bytes remain in the stream! + """ + st = self._to_struct(st) + st_len = st.size + if self.stream_length < self.pointer + st_len: + raise NotEnoughBytes('Not enough bytes') + try: + return st.unpack(self.b_stream[self.pointer:self.pointer+st_len]) + finally: + self.pointer += st_len diff --git a/tests/test_parsing.py b/tests/test_parsing.py new file mode 100644 index 00000000..20da8c15 --- /dev/null +++ b/tests/test_parsing.py @@ -0,0 +1,16 @@ +import struct +import unittest + +from satella.exceptions import NotEnoughBytes +from satella.parsing import BinaryParser + + +class TestParsing(unittest.TestCase): + def test_something(self): + bp = BinaryParser(b'\x00\x00\x01\x02\x00\x00\x00\xFF\x00') + self.assertEqual(bp.get_bytes(2), b'\x00\x00') + self.assertEqual(bp.get_structs('BB'), (1, 2)) + self.assertEqual(bp.get_struct(struct.Struct('>L')), 255) + self.assertRaises(NotEnoughBytes, lambda: bp.get_struct('>L')) + self.assertRaises(NotEnoughBytes, lambda: bp.get_bytes(5)) + self.assertRaises(NotEnoughBytes, lambda: BinaryParser(b'', 1)) -- GitLab