From 6ce923e6bf81b07639db659f6a2652bdf29b3c68 Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Tue, 10 Jan 2017 05:59:02 +0100 Subject: [PATCH] memoryvieeeews --- coolamqp/attaches/consumer.py | 15 +++++++++++++-- coolamqp/uplink/connection/recv_framer.py | 11 +++++++++-- tests/test_clustering/test_a.py | 18 +++++++++++++++++- 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index a63a1f3..6043072 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -35,7 +35,8 @@ class Consumer(Channeler): def __init__(self, queue, on_message, no_ack=True, qos=None, cancel_on_failure=False, future_to_notify=None, - fail_on_first_time_resource_locked=False + fail_on_first_time_resource_locked=False, + fucking_memoryviews=False ): """ :param queue: Queue object, being consumed from right now. @@ -60,6 +61,8 @@ class Consumer(Channeler): of a connection fail - next reconnect will consider this to be SECOND declaration, ie. it will retry ad infinitum :type fail_on_first_time_resource_locked: bool + :param fucking_memoryviews: if you set that to True, bodies of your ReceivedMessages will be iterables + of memoryviews, instead of bytes. Fast as fuck :/ """ super(Consumer, self).__init__() @@ -84,6 +87,7 @@ class Consumer(Channeler): self.future_to_notify = future_to_notify self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked self.cancel_on_failure = cancel_on_failure + self.fucking_memoryviews = fucking_memoryviews def set_qos(self, prefetch_size, prefetch_count): """ @@ -391,8 +395,15 @@ class MessageReceiver(object): self.acks_pending.add(self.bdeliver.delivery_tag) from coolamqp.objects import ReceivedMessage + + + if self.consumer.fucking_memoryviews: + body = self.body + else: + b''.join((mv.tobytes() for mv in self.body)) + rm = ReceivedMessage( - b''.join(map(six.binary_type, self.body)), #todo inefficient as FUUUCK + body, self.bdeliver.exchange, self.bdeliver.routing_key, self.header.properties, diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index 4e80955..7d97533 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -75,7 +75,10 @@ class ReceivingFramer(object): def _statemachine(self): # state rule 1 if self.frame_type is None and self.total_data_len > 0: - self.frame_type = ord(self._extract(1)[0]) + if six.PY3: + self.frame_type = self._extract(1)[0] + else: + self.frame_type = ord(self._extract(1)[0]) if self.frame_type not in (FRAME_HEARTBEAT, FRAME_HEADER, FRAME_METHOD, FRAME_BODY): raise ValueError('Invalid frame') @@ -123,7 +126,11 @@ class ReceivingFramer(object): payload = buffer(payload.getvalue()) - if ord(self._extract(1)[0]) != FRAME_END: + z = self._extract(1)[0] + if six.PY2: + z = ord(z) + + if z != FRAME_END: raise ValueError('Invalid frame end') try: diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index a96764b..f152f09 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -30,7 +30,7 @@ class TestA(unittest.TestCase): # fut.result() con.cancel() - def test_send_recv(self): + def test_send_recv_zerolen(self): P = {'q': False} @@ -46,5 +46,21 @@ class TestA(unittest.TestCase): self.assertTrue(P['q']) + def test_send_recv_nonzerolen(self): + + P = {'q': False} + + def ok(e): + self.assertIsInstance(e, ReceivedMessage) + self.assertEquals(e.body, b'hello') + P['q'] = True + + con, fut = self.c.consume(Queue(u'hello', exclusive=True), on_message=ok, no_ack=True) + fut.result() + self.c.publish(Message(b'hello'), routing_key=u'hello', tx=True).result() + + time.sleep(1) + + self.assertTrue(P['q']) -- GitLab