diff --git a/README.md b/README.md index feb513202082febc2a2c79cbdb181f6c2d3c3d82..6f6c407617adb6a32ae2330062c1d3c11a00df5c 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,8 @@ CoolAMQP A **magical** AMQP client, that uses **heavy sorcery** to achieve speeds that other AMQP clients cannot even hope to match. tl;dr - [this](coolamqp/framing/definitions.py) is **machine-generated** compile-time. -[this](coolamqp/framing/compilation/content_property.py) **generates classes run-time**. +[this](coolamqp/framing/compilation/content_property.py) **generates classes run-time**, +and there are memoryviews **_everywhere_**. The project is actively maintained and used in a commercial project. Tests can run diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index d269516a1b75950db24e88e3603cc56bdd5c9ee7..354afb6b8985ab29df14dca7440fee3407aecb0a 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -16,6 +16,9 @@ from coolamqp.exceptions import ResourceLocked, AMQPError logger = logging.getLogger(__name__) +EMPTY_MEMORYVIEW = memoryview(b'') # for empty messages + + class Consumer(Channeler): """ This object represents a consumer in the system. @@ -159,6 +162,7 @@ class Consumer(Channeler): if isinstance(payload, BasicCancel): # Consumer Cancel Notification - by RabbitMQ + # send them back those memoryviews :D self.methods([BasicCancelOk(payload.consumer_tag), ChannelClose(0, b'Received basic.cancel', 0, 0)]) return @@ -188,11 +192,9 @@ class Consumer(Channeler): self.future_to_notify.set_exception(AMQPError(payload)) self.future_to_notify = None - # We might not want to throw the connection away. should_retry = should_retry and (not self.cancelled) - old_con = self.connection super(Consumer, self).on_close(payload) # this None's self.connection @@ -375,7 +377,7 @@ class MessageReceiver(object): if self.header.body_size == 0: # An empty message is no common guest. It won't have a BODY field though... - self.on_body(memoryview(b'')) # trigger it manually + self.on_body(EMPTY_MEMORYVIEW) # trigger it manually def on_basic_deliver(self, payload): assert self.state == 0 diff --git a/coolamqp/framing/compilation/compile_definitions.py b/coolamqp/framing/compilation/compile_definitions.py index a39e42747796b8bd94eaedef8c6ba1254d0a2f08..4c8d5bebd33cba48912b2b9a353ab724036dd6d9 100644 --- a/coolamqp/framing/compilation/compile_definitions.py +++ b/coolamqp/framing/compilation/compile_definitions.py @@ -298,8 +298,12 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved """ # extract property flags pfl = 2 - while ord(buf[offset + pfl - 1]) & 1: - pfl += 2 + if six.PY2: + while ord(buf[offset + pfl - 1]) & 1: + pfl += 2 + else: + while buf[offset + pfl - 1]) & 1: + pfl += 2 zpf = %s.zero_property_flags(buf[offset:offset+pfl]).tobytes() if zpf in %s.PARTICULAR_CLASSES: return %s.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 9a45df89175fc8f563bd04160cdf87f3ed777071..62809bb171d915ccab2775bcb39d3a24e0c98b3c 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -75,15 +75,14 @@ class ReceivedMessage(Message): nack=None): """ :param body: message body. A stream of octets. - :type body: str (py2) or bytes (py3) - :param connect_id: connection ID. ClusterHandlerThread will check this in order - not to ack messages that were received from a dead connection + :type body: str (py2) or bytes (py3) or a list of memoryviews, if particular disabled-by-default option + is turned on. :param exchange_name: name of exchange this message was submitted to :type exchange_name: memoryview :param routing_key: routing key with which this message was sent :type routing_key: memoryview - :param properties: a suitable BasicContentPropertyList subinstance - + :param properties: a suitable BasicContentPropertyList subinstance. + be prepared that value of properties that are strings will be memoryviews :param delivery_tag: delivery tag assigned by AMQP broker to confirm this message :param ack: a callable to call when you want to ack (via basic.ack) this message. None if received by the no-ack mechanism