diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index a63a1f3b811e32803746b2954be7002157299a1c..6043072bc0ba0172b0834e5321dbfc3ada51aeb8 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -35,7 +35,8 @@ class Consumer(Channeler): def __init__(self, queue, on_message, no_ack=True, qos=None, cancel_on_failure=False, future_to_notify=None, - fail_on_first_time_resource_locked=False + fail_on_first_time_resource_locked=False, + fucking_memoryviews=False ): """ :param queue: Queue object, being consumed from right now. @@ -60,6 +61,8 @@ class Consumer(Channeler): of a connection fail - next reconnect will consider this to be SECOND declaration, ie. it will retry ad infinitum :type fail_on_first_time_resource_locked: bool + :param fucking_memoryviews: if you set that to True, bodies of your ReceivedMessages will be iterables + of memoryviews, instead of bytes. Fast as fuck :/ """ super(Consumer, self).__init__() @@ -84,6 +87,7 @@ class Consumer(Channeler): self.future_to_notify = future_to_notify self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked self.cancel_on_failure = cancel_on_failure + self.fucking_memoryviews = fucking_memoryviews def set_qos(self, prefetch_size, prefetch_count): """ @@ -391,8 +395,15 @@ class MessageReceiver(object): self.acks_pending.add(self.bdeliver.delivery_tag) from coolamqp.objects import ReceivedMessage + + + if self.consumer.fucking_memoryviews: + body = self.body + else: + b''.join((mv.tobytes() for mv in self.body)) + rm = ReceivedMessage( - b''.join(map(six.binary_type, self.body)), #todo inefficient as FUUUCK + body, self.bdeliver.exchange, self.bdeliver.routing_key, self.header.properties, diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index 4e809557bd6ac5fa0df2a9e8d99f2c7dab7a0dad..7d9753331ac8897fd9dfd7187ba5a804b5a52dc0 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -75,7 +75,10 @@ class ReceivingFramer(object): def _statemachine(self): # state rule 1 if self.frame_type is None and self.total_data_len > 0: - self.frame_type = ord(self._extract(1)[0]) + if six.PY3: + self.frame_type = self._extract(1)[0] + else: + self.frame_type = ord(self._extract(1)[0]) if self.frame_type not in (FRAME_HEARTBEAT, FRAME_HEADER, FRAME_METHOD, FRAME_BODY): raise ValueError('Invalid frame') @@ -123,7 +126,11 @@ class ReceivingFramer(object): payload = buffer(payload.getvalue()) - if ord(self._extract(1)[0]) != FRAME_END: + z = self._extract(1)[0] + if six.PY2: + z = ord(z) + + if z != FRAME_END: raise ValueError('Invalid frame end') try: diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index a96764bdad53d453f66b75812f5ef7f2874a902c..f152f096b46adbd68a42f63da5524d72ee45c81a 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -30,7 +30,7 @@ class TestA(unittest.TestCase): # fut.result() con.cancel() - def test_send_recv(self): + def test_send_recv_zerolen(self): P = {'q': False} @@ -46,5 +46,21 @@ class TestA(unittest.TestCase): self.assertTrue(P['q']) + def test_send_recv_nonzerolen(self): + + P = {'q': False} + + def ok(e): + self.assertIsInstance(e, ReceivedMessage) + self.assertEquals(e.body, b'hello') + P['q'] = True + + con, fut = self.c.consume(Queue(u'hello', exclusive=True), on_message=ok, no_ack=True) + fut.result() + self.c.publish(Message(b'hello'), routing_key=u'hello', tx=True).result() + + time.sleep(1) + + self.assertTrue(P['q'])