diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index ef275e9af43c5be86e112fe1a9d67060c2bf2a2a..c0e211d5deac79f91d26c133b1a2be2827fbdb34 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -375,7 +375,7 @@ class MessageReceiver(object): if self.header.body_size == 0: # An empty message is no common guest. It won't have a BODY field though... - self.on_body(b'') # trigger it manually + self.on_body(memoryview(b'')) # trigger it manually def on_basic_deliver(self, payload): assert self.state == 0 @@ -398,11 +398,9 @@ class MessageReceiver(object): from coolamqp.objects import ReceivedMessage - - if self.consumer.fucking_memoryviews: - body = self.body - else: - b''.join((mv.tobytes() for mv in self.body)) + body = self.body + if not self.consumer.fucking_memoryviews: + body = b''.join((mv.tobytes() for mv in body)) rm = ReceivedMessage( body, diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index 3ee0f7a13b4f537482d26cd7a3fdca3ad69ac6cc..3d7d81d4658cc6c19667d609c25d891bbc79e208 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -2,6 +2,7 @@ from __future__ import absolute_import, division, print_function import logging +import six import struct logger = logging.getLogger(__name__) @@ -143,7 +144,7 @@ class AMQPMethodPayload(AMQPPayload): buf.write(struct.pack('!I', self.get_size()+2)) buf.write(self.BINARY_HEADER) self.write_arguments(buf) - buf.write(chr(FRAME_END)) + buf.write(six.int2byte(FRAME_END)) def get_size(self): """ diff --git a/coolamqp/framing/compilation/compile_definitions.py b/coolamqp/framing/compilation/compile_definitions.py index 76558ac5b42fd6631eabe70424a39136a511810b..57e7794583f21db88ebcac5af634efc24835b603 100644 --- a/coolamqp/framing/compilation/compile_definitions.py +++ b/coolamqp/framing/compilation/compile_definitions.py @@ -73,7 +73,13 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved for constant in get_constants(xml): if pythonify_name(constant.name) == 'FRAME_END': FRAME_END = constant.value - g = ffmt('%s = %s', pythonify_name(constant.name), constant.value) + g = ffmt('%s = %s\n', pythonify_name(constant.name), constant.value) + line(g) + if 0 <= constant.value <= 255: + z = repr(six.int2byte(constant.value)) + if not z.startswith(u'b'): + z = u'b' + z + g = ffmt('%s_BYTE = %s\n', pythonify_name(constant.name), z) line(g) if constant.docs: lines = constant.docs.split('\n') @@ -293,7 +299,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved pfl = 2 while ord(buf[offset + pfl - 1]) & 1: pfl += 2 - zpf = %s.zero_property_flags(buf[offset:offset+pfl]) + zpf = %s.zero_property_flags(buf[offset:offset+pfl]).tobytes() if zpf in %s.PARTICULAR_CLASSES: return %s.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) else: diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 55b348e4deb5f46de016821baa469db1d5587492..8a9e7d8cb9f56227ac7d587864c5361ac5819f6c 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -31,51 +31,99 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved # Core constants FRAME_METHOD = 1 +FRAME_METHOD_BYTE = b'\x01' + FRAME_HEADER = 2 +FRAME_HEADER_BYTE = b'\x02' + FRAME_BODY = 3 +FRAME_BODY_BYTE = b'\x03' + FRAME_HEARTBEAT = 8 +FRAME_HEARTBEAT_BYTE = b'\x08' + +FRAME_MIN_SIZE = 4096 FRAME_MIN_SIZE = 4096 + FRAME_END = 206 -REPLY_SUCCESS = 200 # Indicates that the method completed successfully. This reply code is - # reserved for future use - the current protocol design does not use positive - # confirmation and reply codes are sent only in case of an error. -CONTENT_TOO_LARGE = 311 # The client attempted to transfer content larger than the server could accept - # at the present time. The client may retry at a later time. -NO_CONSUMERS = 313 # When the exchange cannot deliver to a consumer when the immediate flag is - # set. As a result of pending data on the queue or the absence of any - # consumers of the queue. -CONNECTION_FORCED = 320 # An operator intervened to close the connection for some reason. The client - # may retry at some later date. -INVALID_PATH = 402 # The client tried to work with an unknown virtual host. -ACCESS_REFUSED = 403 # The client attempted to work with a server entity to which it has no - # access due to security settings. -NOT_FOUND = 404 # The client attempted to work with a server entity that does not exist. -RESOURCE_LOCKED = 405 # The client attempted to work with a server entity to which it has no - # access because another client is working with it. -PRECONDITION_FAILED = 406 # The client requested a method that was not allowed because some precondition - # failed. -FRAME_ERROR = 501 # The sender sent a malformed frame that the recipient could not decode. - # This strongly implies a programming error in the sending peer. -SYNTAX_ERROR = 502 # The sender sent a frame that contained illegal values for one or more - # fields. This strongly implies a programming error in the sending peer. -COMMAND_INVALID = 503 # The client sent an invalid sequence of frames, attempting to perform an - # operation that was considered invalid by the server. This usually implies - # a programming error in the client. -CHANNEL_ERROR = 504 # The client attempted to work with a channel that had not been correctly - # opened. This most likely indicates a fault in the client layer. -UNEXPECTED_FRAME = 505 # The peer sent a frame that was not expected, usually in the context of - # a content header and body. This strongly indicates a fault in the peer's - # content processing. -RESOURCE_ERROR = 506 # The server could not complete the method because it lacked sufficient - # resources. This may be due to the client creating too many of some type - # of entity. -NOT_ALLOWED = 530 # The client tried to work with some entity in a manner that is prohibited - # by the server, due to security settings or by some other criteria. -NOT_IMPLEMENTED = 540 # The client tried to use functionality that is not implemented in the - # server. -INTERNAL_ERROR = 541 # The server could not complete the method because of an internal error. - # The server may require intervention by an operator in order to resume - # normal operations. +FRAME_END_BYTE = b'\xce' + +REPLY_SUCCESS = 200 +REPLY_SUCCESS_BYTE = b'\xc8' + # Indicates that the method completed successfully. This reply code is + # reserved for future use - the current protocol design does not use positive + # confirmation and reply codes are sent only in case of an error. +CONTENT_TOO_LARGE = 311 +CONTENT_TOO_LARGE = 311 + # The client attempted to transfer content larger than the server could accept + # at the present time. The client may retry at a later time. +NO_CONSUMERS = 313 +NO_CONSUMERS = 313 + # When the exchange cannot deliver to a consumer when the immediate flag is + # set. As a result of pending data on the queue or the absence of any + # consumers of the queue. +CONNECTION_FORCED = 320 +CONNECTION_FORCED = 320 + # An operator intervened to close the connection for some reason. The client + # may retry at some later date. +INVALID_PATH = 402 +INVALID_PATH = 402 + # The client tried to work with an unknown virtual host. +ACCESS_REFUSED = 403 +ACCESS_REFUSED = 403 + # The client attempted to work with a server entity to which it has no + # access due to security settings. +NOT_FOUND = 404 +NOT_FOUND = 404 + # The client attempted to work with a server entity that does not exist. +RESOURCE_LOCKED = 405 +RESOURCE_LOCKED = 405 + # The client attempted to work with a server entity to which it has no + # access because another client is working with it. +PRECONDITION_FAILED = 406 +PRECONDITION_FAILED = 406 + # The client requested a method that was not allowed because some precondition + # failed. +FRAME_ERROR = 501 +FRAME_ERROR = 501 + # The sender sent a malformed frame that the recipient could not decode. + # This strongly implies a programming error in the sending peer. +SYNTAX_ERROR = 502 +SYNTAX_ERROR = 502 + # The sender sent a frame that contained illegal values for one or more + # fields. This strongly implies a programming error in the sending peer. +COMMAND_INVALID = 503 +COMMAND_INVALID = 503 + # The client sent an invalid sequence of frames, attempting to perform an + # operation that was considered invalid by the server. This usually implies + # a programming error in the client. +CHANNEL_ERROR = 504 +CHANNEL_ERROR = 504 + # The client attempted to work with a channel that had not been correctly + # opened. This most likely indicates a fault in the client layer. +UNEXPECTED_FRAME = 505 +UNEXPECTED_FRAME = 505 + # The peer sent a frame that was not expected, usually in the context of + # a content header and body. This strongly indicates a fault in the peer's + # content processing. +RESOURCE_ERROR = 506 +RESOURCE_ERROR = 506 + # The server could not complete the method because it lacked sufficient + # resources. This may be due to the client creating too many of some type + # of entity. +NOT_ALLOWED = 530 +NOT_ALLOWED = 530 + # The client tried to work with some entity in a manner that is prohibited + # by the server, due to security settings or by some other criteria. +NOT_IMPLEMENTED = 540 +NOT_IMPLEMENTED = 540 + # The client tried to use functionality that is not implemented in the + # server. +INTERNAL_ERROR = 541 +INTERNAL_ERROR = 541 + # The server could not complete the method because of an internal error. + # The server may require intervention by an operator in order to resume + # normal operations. HARD_ERROR = [CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, INTERNAL_ERROR] SOFT_ERROR = [CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, RESOURCE_LOCKED, PRECONDITION_FAILED] diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index f45fe5ee263384d9290364a00770ecd440cc21ae..c819a30134bc7da2cbafcaeaae0ecbf50712b290 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -9,7 +9,8 @@ import six from coolamqp.framing.base import AMQPFrame from coolamqp.framing.definitions import FRAME_METHOD, FRAME_HEARTBEAT, FRAME_BODY, FRAME_HEADER, FRAME_END, \ - IDENT_TO_METHOD, CLASS_ID_TO_CONTENT_PROPERTY_LIST + IDENT_TO_METHOD, CLASS_ID_TO_CONTENT_PROPERTY_LIST, FRAME_METHOD_BYTE, FRAME_BODY_BYTE, FRAME_HEADER_BYTE, \ + FRAME_END_BYTE class AMQPMethodFrame(AMQPFrame): @@ -32,7 +33,7 @@ class AMQPMethodFrame(AMQPFrame): 4 + self.payload.get_size())) buf.write(self.payload.BINARY_HEADER) self.payload.write_arguments(buf) - buf.write(chr(FRAME_END)) + buf.write(FRAME_END_BYTE) @staticmethod def unserialize(channel, payload_as_buffer): @@ -72,7 +73,7 @@ class AMQPHeaderFrame(AMQPFrame): buf.write(struct.pack('!BHLHHQ', FRAME_HEADER, self.channel, 12+self.properties.get_size(), self.class_id, 0, self.body_size)) self.properties.write_to(buf) - buf.write(chr(FRAME_END)) + buf.write(FRAME_END_BYTE) @staticmethod def unserialize(channel, payload_as_buffer): @@ -102,7 +103,7 @@ class AMQPBodyFrame(AMQPFrame): def write_to(self, buf): buf.write(struct.pack('!BHL', FRAME_BODY, self.channel, len(self.data))) buf.write(self.data) - buf.write(chr(FRAME_END)) + buf.write(FRAME_END_BYTE) @staticmethod def unserialize(channel, payload_as_buffer): diff --git a/coolamqp/objects.py b/coolamqp/objects.py index c9b23b2b216a1cbcf5c34b060acf1dd8832fb62a..e9cd9c6c27226a023d9e9c962e21aa904cbfe1a8 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -34,7 +34,8 @@ class Message(object): Please take care with passing empty bodies, as py-amqp has some failure on it. :param body: stream of octets - :type body: str (py2) or bytes (py3) + :type body: str (py2) or bytes (py3) or ... + a list of memoryviews, if you find the right option. It's disabled by default. :param properties: AMQP properties to be sent along. default is 'no properties at all' You can pass a dict - it will be passed to MessageProperties, @@ -43,7 +44,8 @@ class Message(object): """ if isinstance(body, six.text_type): raise TypeError('body cannot be a text type!') - self.body = six.binary_type(body) + + self.body = body if isinstance(properties, dict): self.properties = MessageProperties(**properties)