diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 4c3320eb418a4b1251c0343203bd61e31a4f161c..b17f5a46eb246be538ccd655ac0fcd818a1a31d5 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -1,6 +1,7 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function import six +import io import logging import uuid import warnings @@ -23,15 +24,20 @@ EMPTY_MEMORYVIEW = memoryview(b'') # for empty messages class BodyReceiveMode(object): + # ZC - zero copy + # C - copy (copies every byte once) + BYTES = 0 # message.body will be a single bytes object # this will gather frames as memoryviews, and b''.join() them upon receiving last frame - # this is O(N) + # this is C MEMORYVIEW = 1 # message.body will be returned as a memoryview object - # this is O(1) for single frame messages, and O(N) for multi-frame ones + # this is ZC for small messages, and C for multi-frame ones + # think less than 800B, since 2048 is the buffer for socket recv, and an AMQP + # frame (or many frames!) have to fit there LIST_OF_MEMORYVIEW = 2 # message.body will be returned as list of memoryview objects - # these constitute received pieces. this is always O(1) + # these constitute received pieces. this is always ZC class Consumer(Channeler): """ @@ -523,8 +529,13 @@ class MessageReceiver(object): # Does body need preprocessing? body = self.body if self.recv_mode == BodyReceiveMode.BYTES: - #todo optimize - .tobyes - body = b''.join((mv.tobytes() for mv in body)) + # since b''.join() with list comprehension and .tobytes() would create + # an extra copy of string + bio = io.BytesIO() + for mv in body: + bio.write(mv) + + body = bio.getvalue() # if MEMORYVIEW, then it's already ok rm = ReceivedMessage(