diff --git a/CHANGELOG.md b/CHANGELOG.md index 7df23e0c1937d92572d27d7e4e811dfbc318b2cf..1ee7a1a2bb61a5da92dac21557065929a8a9481a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,3 +2,4 @@ * added `BinaryParser.get_remaining_bytes` * added `BinaryParser.get_remaining_bytes_count` +* added `BinaryParser.get_parser` diff --git a/satella/__init__.py b/satella/__init__.py index 003b957117fc3b97261e503aca3a766106954ad3..75ab0538ef42bd448949b10e61a73bf3c165f9e0 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.18.1a2' +__version__ = '2.18.1' diff --git a/satella/parsing.py b/satella/parsing.py index c8098b7021a0348f71fe87356305b6775689f85e..e98be49407b758c8e84581ac3f290ecfde4d9995 100644 --- a/satella/parsing.py +++ b/satella/parsing.py @@ -1,3 +1,4 @@ +from __future__ import absolute_import import struct import typing as tp @@ -8,32 +9,61 @@ class BinaryParser: """ A class that allows parsing binary streams easily. - This supports __len__ to return the amount of bytes remaining. + This supports __len__ to return the amount of bytes in the stream, + and __bytes__ to return the bytes. :param b_stream: an object that allows indiced access, and allows subscripts to span ranges, which will return items parseable by struct :param offset: initial offset into the stream + :param length: optional maximum length of byte count :raises NotEnoughBytes: offset larger than stream length :ivar offset: offset from which bytes will be readed """ def __len__(self) -> int: - return self.get_remaining_bytes_count() + return self.length def get_remaining_bytes_count(self) -> int: """ Return the amount of bytes remaining. This will not advance the pointer """ - return self.stream_length - self.pointer + return self.length - self.pointer - def __init__(self, b_stream: tp.Union[bytes, bytearray], offset: int = 0): + def __init__(self, b_stream: tp.Union[bytes, bytearray], offset: int = 0, + length: tp.Optional[int] = None): self.b_stream = b_stream + self.pointer = self.init_ofs = offset self.struct_cache = {} - self.stream_length = len(b_stream) - self.pointer = offset + self.length = length or len(b_stream) if offset > len(self.b_stream): raise NotEnoughBytes('Offset larger than the stream!') + def __bytes__(self) -> bytes: + return self.b_stream[self.init_ofs:self.init_ofs+self.length] + + def get_parser(self, length: int) -> 'BinaryParser': + """ + Return a subclassed binary parser providing a window to another binary parser's data. + + This will advance the pointer by length + + :param length: amount of bytes to view + :return: a BinaryParser + """ + if self.length < self.pointer + length: + raise NotEnoughBytes('Not enough bytes') + try: + return BinaryParser(self.b_stream, self.pointer, length) + finally: + self.pointer += length + + def reset(self) -> None: + """ + Reset the internal pointer to starting value + :return: + """ + self.pointer = self.init_ofs + def _to_struct(self, st: tp.Union[str, struct.Struct]) -> struct.Struct: if isinstance(st, struct.Struct): return st @@ -55,7 +85,7 @@ class BinaryParser: :return: bytes returned :raises NotEnoughBytes: not enough bytes remain in the stream! """ - if self.stream_length < self.pointer + n: + if self.length < self.pointer + n: raise NotEnoughBytes('Not enough bytes') try: return self.b_stream[self.pointer:self.pointer+n] @@ -85,7 +115,7 @@ class BinaryParser: 'get_structs for multiples!' st_len = st.size - if self.stream_length < self.pointer + st_len: + if self.length < self.pointer + st_len: raise NotEnoughBytes('Not enough bytes') try: @@ -106,7 +136,7 @@ class BinaryParser: """ st = self._to_struct(st) st_len = st.size - if self.stream_length < self.pointer + st_len: + if self.length < self.pointer + st_len: raise NotEnoughBytes('Not enough bytes') try: return st.unpack(self.b_stream[self.pointer:self.pointer+st_len]) diff --git a/tests/test_parsing.py b/tests/test_parsing.py index 796f858279a8b76414201dc532b9ed7769226910..aa375e27e0fd3583dabe329c27675b379fc8fc6f 100644 --- a/tests/test_parsing.py +++ b/tests/test_parsing.py @@ -15,4 +15,11 @@ class TestParsing(unittest.TestCase): self.assertRaises(NotEnoughBytes, lambda: bp.get_bytes(5)) self.assertRaises(NotEnoughBytes, lambda: BinaryParser(b'', 1)) self.assertEqual(bp.get_remaining_bytes(), b'\x00') - self.assertEqual(len(bp), 1) + self.assertEqual(len(bp), 9) + bp.reset() + bp2 = bp.get_parser(4) + self.assertEqual(len(bp2), 4) + self.assertEqual(bytes(bp2), b'\x00\x00\x01\x02') + self.assertEqual(bp2.get_struct('>L'), 258) + self.assertRaises(NotEnoughBytes, lambda: bp2.get_struct('>L')) + self.assertEqual(bp.pointer, 4)