From 5eeb277b8c10c438e9beabc8e43c1131fddff956 Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Sun, 5 Feb 2017 05:51:43 +0100 Subject: [PATCH] solved the body/memoryview problem --- coolamqp/attaches/__init__.py | 2 +- coolamqp/attaches/consumer.py | 69 +++++++++++++++++++++++++++------ tests/run.py | 2 +- tests/test_clustering/test_a.py | 54 +++++++++++++++++++------- 4 files changed, 101 insertions(+), 26 deletions(-) diff --git a/coolamqp/attaches/__init__.py b/coolamqp/attaches/__init__.py index aaf4ba0..f13c492 100644 --- a/coolamqp/attaches/__init__.py +++ b/coolamqp/attaches/__init__.py @@ -12,7 +12,7 @@ Multiple attaches can be "abstracted" as single one via AttacheGroup (which is a EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. """ -from coolamqp.attaches.consumer import Consumer +from coolamqp.attaches.consumer import Consumer, BodyReceiveMode from coolamqp.attaches.publisher import Publisher from coolamqp.attaches.agroup import AttacheGroup from coolamqp.attaches.declarer import Declarer \ No newline at end of file diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 664c692..4c3320e 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -22,6 +22,17 @@ logger = logging.getLogger(__name__) EMPTY_MEMORYVIEW = memoryview(b'') # for empty messages +class BodyReceiveMode(object): + BYTES = 0 # message.body will be a single bytes object + # this will gather frames as memoryviews, and b''.join() them upon receiving last frame + # this is O(N) + + MEMORYVIEW = 1 # message.body will be returned as a memoryview object + # this is O(1) for single frame messages, and O(N) for multi-frame ones + + LIST_OF_MEMORYVIEW = 2 # message.body will be returned as list of memoryview objects + # these constitute received pieces. this is always O(1) + class Consumer(Channeler): """ This object represents a consumer in the system. @@ -57,10 +68,12 @@ class Consumer(Channeler): """ + + def __init__(self, queue, on_message, no_ack=True, qos=None, cancel_on_failure=False, future_to_notify=None, fail_on_first_time_resource_locked=False, - fucking_memoryviews=False + body_receive_mode=BodyReceiveMode.BYTES ): """ Note that if you specify QoS, it is applied before basic.consume is sent. This will prevent @@ -88,10 +101,8 @@ class Consumer(Channeler): of a connection fail - next reconnect will consider this to be SECOND declaration, ie. it will retry ad infinitum :type fail_on_first_time_resource_locked: bool - :param fucking_memoryviews: if you set that to True, bodies of your ReceivedMessages will be iterables - of memoryviews, instead of bytes. - AT LEAST YOU WILL BE FAST AS FUUUUCK - :type fucking_memoryviews: bool + :param body_receive_mode: how should message.body be received. This has a performance impact + :type body_receive_mode: BodyReceiveMode.* """ super(Consumer, self).__init__() @@ -118,7 +129,7 @@ class Consumer(Channeler): self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked self.cancel_on_failure = cancel_on_failure - self.fucking_memoryviews = fucking_memoryviews + self.body_receive_mode = body_receive_mode self.consumer_tag = None @@ -406,11 +417,23 @@ class MessageReceiver(object): self.bdeliver = None # payload of Basic-Deliver self.header = None # AMQPHeaderFrame - self.body = [] # list of payloads + if consumer.body_receive_mode == BodyReceiveMode.MEMORYVIEW: + self.body = None # None is an important sign - first piece of message + else: + self.body = [] # list of payloads self.data_to_go = None # set on receiving header, how much bytes we need yet + self.message_size = None # in bytes, of currently received message + self.offset = 0 # used only in MEMORYVIEW mode - pointer to self.body (which would be a buffer) self.acks_pending = set() # list of things to ack/reject + self.recv_mode = consumer.body_receive_mode + # if BYTES, pieces (as mvs) are received into .body and b''.join()ed at the end + # if MEMORYVIEW: + # upon first piece, if it's a single-frame message, it's returned at once + # if multiframe, self.body is made into a buffer and further are received into it + # if LIST_OF_MEMORYVIEW, pieces (as mvs) are stored into .body, and that's returned + def on_gone(self): """Called by Consumer to inform upon discarding this receiver""" self.state = 3 @@ -450,7 +473,7 @@ class MessageReceiver(object): def on_head(self, frame): assert self.state == 1 self.header = frame - self.data_to_go = frame.body_size + self.message_size = self.data_to_go = frame.body_size self.state = 2 if self.header.body_size == 0: @@ -465,8 +488,26 @@ class MessageReceiver(object): def on_body(self, payload): """:type payload: buffer""" assert self.state == 2 - self.body.append(payload) self.data_to_go -= len(payload) + + if self.recv_mode == BodyReceiveMode.MEMORYVIEW: + + if self.body is not None: + # continuing a multipart + self.body[self.offset:self.offset + len(payload)] = payload + self.offset += len(payload) + else: + # new one + if self.data_to_go == 0: # special case - single frame message + self.body = payload + else: + self.body = memoryview(bytearray(self.message_size)) + self.body[0:len(payload)] = payload + self.offset = len(payload) + + else: # BYTES and LIST_OF_MEMORYVIEW + self.body.append(payload) + assert self.data_to_go >= 0 if self.data_to_go == 0: @@ -479,9 +520,12 @@ class MessageReceiver(object): from coolamqp.objects import ReceivedMessage + # Does body need preprocessing? body = self.body - if not self.consumer.fucking_memoryviews: + if self.recv_mode == BodyReceiveMode.BYTES: + #todo optimize - .tobyes body = b''.join((mv.tobytes() for mv in body)) + # if MEMORYVIEW, then it's already ok rm = ReceivedMessage( body, @@ -498,4 +542,7 @@ class MessageReceiver(object): self.state = 0 # at this point it's safe to clear the body - self.body = [] + if self.recv_mode == BodyReceiveMode.MEMORYVIEW: + self.body = None + else: + self.body = [] diff --git a/tests/run.py b/tests/run.py index 5e7d185..e574f3c 100644 --- a/tests/run.py +++ b/tests/run.py @@ -16,7 +16,7 @@ amqp.start(wait=True) q = Queue(u'lolwut', auto_delete=True, exclusive=True) -c,f=amqp.consume(q, no_ack=True) +c,f=amqp.consume(q, no_ack=True, body_receive_mode=1) #time.sleep(30) diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index ad56925..48c5e0d 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -145,6 +145,7 @@ class TestA(unittest.TestCase): def test_send_recv_nonzerolen(self): + """with callback function""" P = {'q': False} @@ -161,23 +162,50 @@ class TestA(unittest.TestCase): self.assertTrue(P['q']) - def test_send_recv_nonzerolen_fuckingmemoryviews(self): + def test_send_recv_nonzerolen_memoryview(self): + """single and multi frame""" + from coolamqp.attaches import BodyReceiveMode - P = {'q': False} - - def ok(e): - self.assertIsInstance(e, ReceivedMessage) - self.assertIsInstance(e.body[0], memoryview) - P['q'] = True - - con, fut = self.c.consume(Queue(u'hello', exclusive=True), on_message=ok, no_ack=True, fucking_memoryviews=True) + con, fut = self.c.consume(Queue(u'hello', exclusive=True), no_ack=True, + body_receive_mode=BodyReceiveMode.MEMORYVIEW) fut.result() - self.c.publish(Message(b'hello'), routing_key=u'hello', tx=True).result() - - time.sleep(1) - self.assertTrue(P['q']) + data = b'hello' + self.c.publish(Message(data), routing_key=u'hello', confirm=True) + m = self.c.drain(2) + self.assertIsInstance(m, MessageReceived) + self.assertIsInstance(m.body, memoryview) + self.assertEquals(m.body, data) + + data = six.binary_type(os.urandom(512 * 1024)) + self.c.publish(Message(data), routing_key=u'hello', confirm=True) + m = self.c.drain(9) + self.assertIsInstance(m, MessageReceived) + self.assertIsInstance(m.body, memoryview) + print(len(m.body)) + self.assertEquals(m.body.tobytes(), data) + + def test_send_recv_nonzerolen_listofmemoryview(self): + """single and multi frame""" + from coolamqp.attaches import BodyReceiveMode + + con, fut = self.c.consume(Queue(u'hello', exclusive=True), no_ack=True, + body_receive_mode=BodyReceiveMode.LIST_OF_MEMORYVIEW) + fut.result() + data = b'hello' + self.c.publish(Message(data), routing_key=u'hello', confirm=True) + m = self.c.drain(1) + self.assertIsInstance(m, MessageReceived) + self.assertIsInstance(m.body[0], memoryview) + self.assertEquals(m.body[0], data) + + data = six.binary_type(os.urandom(512 * 1024)) + self.c.publish(Message(data), routing_key=u'hello', confirm=True) + m = self.c.drain(5) + self.assertIsInstance(m, MessageReceived) + self.assertTrue(all([isinstance(x, memoryview) for x in m.body])) + self.assertEquals(b''.join(x.tobytes() for x in m.body), data) def test_consumer_cancel(self): con, fut = self.c.consume(Queue(u'hello', exclusive=True, auto_delete=True)) -- GitLab