diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 0b1deb61f7873c1ced286fd1b5acb87e9b012ffd..4df7edcdf3011e390840bcf519a27e3fc7de1b0e 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -144,7 +144,7 @@ class PyAMQPBackend(AMQPBackend): def __on_message(self, message): assert isinstance(message.body, six.binary_type) - self.cluster_handler_thread._on_recvmessage(six.binary_type(message.body), + self.cluster_handler_thread._on_recvmessage(message.body, message.delivery_info['exchange'], message.delivery_info['routing_key'], message.delivery_info['delivery_tag'], diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 578ef2aaa79c57c16cd9760e192543d13d2eaa44..5d5bb5c19efe225f4d9043707464fa5c6842e2f7 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -86,7 +86,7 @@ class Cluster(object): def send(self, message, exchange='', routing_key='', on_completed=None, on_failed=None): """ Schedule a message to be sent. - :param message: Message object to send + :param message: Message object to send. :param exchange: Exchange to use. Leave None to use the default exchange :param routing_key: routing key to use :param on_completed: callable/0 to call when this succeeds diff --git a/coolamqp/messages.py b/coolamqp/messages.py index c1c518f145d3f1eb49b6575493d3bc09899f11d5..4815e3440dac4980f8f16da4b71afd408bbc1b9f 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -1,5 +1,6 @@ # coding=UTF-8 import uuid +import six class Message(object): @@ -9,8 +10,10 @@ class Message(object): """ Create a Message object :param body: stream of octets + :type body: str (py2) or bytes (py3) :param properties: AMQP properties to be sent along """ + assert isinstance(body, six.binary_type) self.body = body self.properties = {} if properties is None else properties @@ -21,6 +24,7 @@ class ReceivedMessage(Message): def __init__(self, body, cht, connect_id, exchange_name, routing_key, properties=None, delivery_tag=None): """ :param body: message body. A stream of octets. + :type body: str (py2) or bytes (py3) :param cht: parent ClusterHandlerThread that emitted this message :param connect_id: connection ID. ClusterHandlerThread will check this in order not to ack messages that were received from a dead connection diff --git a/tests/test_basics.py b/tests/test_basics.py index c55764cd901c617bd315859264c538a27220dcaa..90cfc8a07519dacb3291dc33aac4eb668ee3729f 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -55,7 +55,7 @@ class TestBasics(unittest.TestCase): p = self.amqp.drain(wait=4) self.assertIsInstance(p, MessageReceived) - self.assertEquals(six.text_type(p.message.body), b'what the fuck') + self.assertEquals(p.message.body, b'what the fuck') def test_bug_hangs(self): p = Queue('lol', exclusive=True)