diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index 4cdcb72df60092aa4c3c132392e0d1d1c7fc1dfb..24c474fae77e99b5c6be0bb3033459f47eabbc32 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -18,6 +18,11 @@ FRAME_TYPES = { FRAME_METHOD: AMQPMethodFrame } +if six.PY2: + ordpy2 = ord +else: + ordpy2 = lambda x: x + class ReceivingFramer(object): """ @@ -29,15 +34,20 @@ class ReceivingFramer(object): Not thread safe. State machine - (frame_type is None) and has_bytes(1) -> (frame_type <- bytes(1)) + (frame_type is None) and has_bytes(1) -> + (frame_type <- bytes(1)) + + (frame_type is HEARTBEAT) and has_bytes(AMQPHeartbeatFrame.LENGTH-1) + -> (output_frame, frame_type <- None) + (frame_type is not HEARTBEAT and not None) and has_bytes(6) -> + (frame_channel <- bytes(2), + frame_size <- bytes(4)) - (frame_type is HEARTBEAT) and has_bytes(AMQPHeartbeatFrame.LENGTH-1) -> (output_frame, frame_type <- None) - (frame_type is not HEARTBEAT and not None) and has_bytes(6) -> (frame_channel <- bytes(2), - frame_size <- bytes(4)) + (frame_size is not None) and has_bytes(frame_size+1) -> + (output_frame, - (frame_size is not None) and has_bytes(frame_size+1) -> (output_frame, - frame_type <- None - frame_size < None) + frame_type <- None + frame_size < None) """ def __init__(self, on_frame=lambda frame: None): @@ -64,9 +74,15 @@ class ReceivingFramer(object): while self._statemachine(): pass - def _extract(self, - up_to): # return up to up_to bytes from current chunk, switch if necessary - assert self.total_data_len >= up_to, 'Tried to extract %s but %s remaining' % ( + def _extract_single_byte(self): + return ordpy2(self._extract(1)[0]) + + def _extract(self, up_to): + """ + return up to up_to bytes from current chunk, switch if necessary + """ + assert self.total_data_len >= up_to, \ + 'Tried to extract %s but %s remaining' % ( up_to, self.total_data_len) if up_to >= len(self.chunks[0]): q = self.chunks.popleft() @@ -79,13 +95,11 @@ class ReceivingFramer(object): len(q), up_to) return q + def _statemachine(self): # state rule 1 if self.frame_type is None and self.total_data_len > 0: - if six.PY3: - self.frame_type = self._extract(1)[0] - else: - self.frame_type = ord(self._extract(1)[0]) + self.frame_type = self._extract_single_byte() if self.frame_type not in ( FRAME_HEARTBEAT, FRAME_HEADER, FRAME_METHOD, FRAME_BODY): @@ -141,11 +155,7 @@ class ReceivingFramer(object): payload = memoryview(payload.getvalue()) - z = self._extract(1)[0] - if six.PY2: - z = ord(z) - - if z != FRAME_END: + if self._extract_single_byte() != FRAME_END: raise ValueError('Invalid frame end') try: