From 50662d0636402b0f0e369846406ec394e95dc3ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sat, 24 Dec 2016 05:46:46 +0100 Subject: [PATCH] encoding --- coolamqp/backends/pyamqp.py | 2 +- coolamqp/cluster.py | 2 +- coolamqp/messages.py | 4 ++++ tests/test_basics.py | 2 +- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 0b1deb6..4df7edc 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -144,7 +144,7 @@ class PyAMQPBackend(AMQPBackend): def __on_message(self, message): assert isinstance(message.body, six.binary_type) - self.cluster_handler_thread._on_recvmessage(six.binary_type(message.body), + self.cluster_handler_thread._on_recvmessage(message.body, message.delivery_info['exchange'], message.delivery_info['routing_key'], message.delivery_info['delivery_tag'], diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 578ef2a..5d5bb5c 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -86,7 +86,7 @@ class Cluster(object): def send(self, message, exchange='', routing_key='', on_completed=None, on_failed=None): """ Schedule a message to be sent. - :param message: Message object to send + :param message: Message object to send. :param exchange: Exchange to use. Leave None to use the default exchange :param routing_key: routing key to use :param on_completed: callable/0 to call when this succeeds diff --git a/coolamqp/messages.py b/coolamqp/messages.py index c1c518f..4815e34 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -1,5 +1,6 @@ # coding=UTF-8 import uuid +import six class Message(object): @@ -9,8 +10,10 @@ class Message(object): """ Create a Message object :param body: stream of octets + :type body: str (py2) or bytes (py3) :param properties: AMQP properties to be sent along """ + assert isinstance(body, six.binary_type) self.body = body self.properties = {} if properties is None else properties @@ -21,6 +24,7 @@ class ReceivedMessage(Message): def __init__(self, body, cht, connect_id, exchange_name, routing_key, properties=None, delivery_tag=None): """ :param body: message body. A stream of octets. + :type body: str (py2) or bytes (py3) :param cht: parent ClusterHandlerThread that emitted this message :param connect_id: connection ID. ClusterHandlerThread will check this in order not to ack messages that were received from a dead connection diff --git a/tests/test_basics.py b/tests/test_basics.py index c55764c..90cfc8a 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -55,7 +55,7 @@ class TestBasics(unittest.TestCase): p = self.amqp.drain(wait=4) self.assertIsInstance(p, MessageReceived) - self.assertEquals(six.text_type(p.message.body), b'what the fuck') + self.assertEquals(p.message.body, b'what the fuck') def test_bug_hangs(self): p = Queue('lol', exclusive=True) -- GitLab