From 23943514db42eb133f53689e1dc3303641f5d846 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <pmaslanka@smok.co> Date: Thu, 14 Oct 2021 20:02:32 +0200 Subject: [PATCH] v2.18.1 --- CHANGELOG.md | 1 + satella/__init__.py | 2 +- satella/parsing.py | 48 +++++++++++++++++++++++++++++++++++-------- tests/test_parsing.py | 9 +++++++- 4 files changed, 49 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7df23e0c..1ee7a1a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,3 +2,4 @@ * added `BinaryParser.get_remaining_bytes` * added `BinaryParser.get_remaining_bytes_count` +* added `BinaryParser.get_parser` diff --git a/satella/__init__.py b/satella/__init__.py index 003b9571..75ab0538 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.18.1a2' +__version__ = '2.18.1' diff --git a/satella/parsing.py b/satella/parsing.py index c8098b70..e98be494 100644 --- a/satella/parsing.py +++ b/satella/parsing.py @@ -1,3 +1,4 @@ +from __future__ import absolute_import import struct import typing as tp @@ -8,32 +9,61 @@ class BinaryParser: """ A class that allows parsing binary streams easily. - This supports __len__ to return the amount of bytes remaining. + This supports __len__ to return the amount of bytes in the stream, + and __bytes__ to return the bytes. :param b_stream: an object that allows indiced access, and allows subscripts to span ranges, which will return items parseable by struct :param offset: initial offset into the stream + :param length: optional maximum length of byte count :raises NotEnoughBytes: offset larger than stream length :ivar offset: offset from which bytes will be readed """ def __len__(self) -> int: - return self.get_remaining_bytes_count() + return self.length def get_remaining_bytes_count(self) -> int: """ Return the amount of bytes remaining. This will not advance the pointer """ - return self.stream_length - self.pointer + return self.length - self.pointer - def __init__(self, b_stream: tp.Union[bytes, bytearray], offset: int = 0): + def __init__(self, b_stream: tp.Union[bytes, bytearray], offset: int = 0, + length: tp.Optional[int] = None): self.b_stream = b_stream + self.pointer = self.init_ofs = offset self.struct_cache = {} - self.stream_length = len(b_stream) - self.pointer = offset + self.length = length or len(b_stream) if offset > len(self.b_stream): raise NotEnoughBytes('Offset larger than the stream!') + def __bytes__(self) -> bytes: + return self.b_stream[self.init_ofs:self.init_ofs+self.length] + + def get_parser(self, length: int) -> 'BinaryParser': + """ + Return a subclassed binary parser providing a window to another binary parser's data. + + This will advance the pointer by length + + :param length: amount of bytes to view + :return: a BinaryParser + """ + if self.length < self.pointer + length: + raise NotEnoughBytes('Not enough bytes') + try: + return BinaryParser(self.b_stream, self.pointer, length) + finally: + self.pointer += length + + def reset(self) -> None: + """ + Reset the internal pointer to starting value + :return: + """ + self.pointer = self.init_ofs + def _to_struct(self, st: tp.Union[str, struct.Struct]) -> struct.Struct: if isinstance(st, struct.Struct): return st @@ -55,7 +85,7 @@ class BinaryParser: :return: bytes returned :raises NotEnoughBytes: not enough bytes remain in the stream! """ - if self.stream_length < self.pointer + n: + if self.length < self.pointer + n: raise NotEnoughBytes('Not enough bytes') try: return self.b_stream[self.pointer:self.pointer+n] @@ -85,7 +115,7 @@ class BinaryParser: 'get_structs for multiples!' st_len = st.size - if self.stream_length < self.pointer + st_len: + if self.length < self.pointer + st_len: raise NotEnoughBytes('Not enough bytes') try: @@ -106,7 +136,7 @@ class BinaryParser: """ st = self._to_struct(st) st_len = st.size - if self.stream_length < self.pointer + st_len: + if self.length < self.pointer + st_len: raise NotEnoughBytes('Not enough bytes') try: return st.unpack(self.b_stream[self.pointer:self.pointer+st_len]) diff --git a/tests/test_parsing.py b/tests/test_parsing.py index 796f8582..aa375e27 100644 --- a/tests/test_parsing.py +++ b/tests/test_parsing.py @@ -15,4 +15,11 @@ class TestParsing(unittest.TestCase): self.assertRaises(NotEnoughBytes, lambda: bp.get_bytes(5)) self.assertRaises(NotEnoughBytes, lambda: BinaryParser(b'', 1)) self.assertEqual(bp.get_remaining_bytes(), b'\x00') - self.assertEqual(len(bp), 1) + self.assertEqual(len(bp), 9) + bp.reset() + bp2 = bp.get_parser(4) + self.assertEqual(len(bp2), 4) + self.assertEqual(bytes(bp2), b'\x00\x00\x01\x02') + self.assertEqual(bp2.get_struct('>L'), 258) + self.assertRaises(NotEnoughBytes, lambda: bp2.get_struct('>L')) + self.assertEqual(bp.pointer, 4) -- GitLab