From 0571507509231a6067df0cc8c81d6beca2407124 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sun, 25 Dec 2016 02:21:43 +0100 Subject: [PATCH] v0.11 RC, submit for tests lol, found a py-amqp bug --- README.md | 5 +- coolamqp/backends/pyamqp.py | 2 +- coolamqp/handler.py | 12 ++--- coolamqp/messages.py | 7 ++- tests/test_failures.py | 3 ++ tests/test_noack.py | 94 +++++++++++++++++++++++++------------ 6 files changed, 84 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index d8ccb83..d82c485 100644 --- a/README.md +++ b/README.md @@ -28,4 +28,7 @@ Enjoy! # Changelog -* v0.11 - added *no_ack* to *consume*, can pass other non-text types to Message \ No newline at end of file +## v0.11 +* added *no_ack* to *consume* +* can pass other non-text types to Message +* can set global bit in *qos* \ No newline at end of file diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 26e8686..0b402eb 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -21,7 +21,7 @@ def translate_exceptions(fun): return fun(*args, **kwargs) except amqp.RecoverableChannelError as e: raise RemoteAMQPError(e.reply_code, e.reply_text) - except (IOError, amqp.ConnectionForced, amqp.IrrecoverableChannelError) as e: + except (IOError, amqp.ConnectionForced, amqp.IrrecoverableChannelError, amqp.exceptions.UnexpectedFrame) as e: msg = e.message if six.PY2 else e.args[0] raise ConnectionFailedError(msg) return q diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 8ac0041..f026a8a 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -62,7 +62,8 @@ class ClusterHandlerThread(threading.Thread): self.backend = self.cluster.backend(node, self) if self.qos is not None: - self.backend.basic_qos(*self.qos) + pre_siz, pre_cou, glob = self.qos + self.backend.basic_qos(pre_siz, pre_cou, glob) for exchange in self.declared_exchanges.values(): self.backend.exchange_declare(exchange) @@ -104,7 +105,8 @@ class ClusterHandlerThread(threading.Thread): self.backend.basic_publish(order.message, order.exchange, order.routing_key) elif isinstance(order, SetQoS): self.qos = order.qos - self.backend.basic_qos(*self.qos) + pre_siz, pre_cou, glob = order.qos + self.backend.basic_qos(pre_siz, pre_cou, glob) elif isinstance(order, DeclareExchange): self.backend.exchange_declare(order.exchange) self.declared_exchanges[order.exchange.name] = order.exchange @@ -175,12 +177,10 @@ class ClusterHandlerThread(threading.Thread): assert self.is_terminating if self.cluster.connected or (self.backend is not None): - try: + if self.backend is not None: self.backend.shutdown() - except AttributeError: - pass # backend might be None - the if condition is "or" after all + self.backend = None - self.backend = None self.cluster.connected = False def terminate(self): diff --git a/coolamqp/messages.py b/coolamqp/messages.py index 8fe56d7..c28e19f 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -8,7 +8,10 @@ class Message(object): def __init__(self, body, properties=None): """ - Create a Message object + Create a Message object. + + Please take care with passing empty bodies, as py-amqp has some failure on it. + :param body: stream of octets :type body: str (py2) or bytes (py3) :param properties: AMQP properties to be sent along @@ -16,7 +19,7 @@ class Message(object): if isinstance(body, six.text_type): raise TypeError('body cannot be a text type!') self.body = six.binary_type(body) - self.properties = {} if properties is None else properties + self.properties = properties or {} class ReceivedMessage(Message): diff --git a/tests/test_failures.py b/tests/test_failures.py index 2349fc9..f091b41 100644 --- a/tests/test_failures.py +++ b/tests/test_failures.py @@ -38,6 +38,9 @@ class TestFailures(unittest.TestCase): def tearDown(self): self.amqp.shutdown() + def test_cancel_not_consumed_queue(self): + self.amqp.cancel(Queue('hello world')).result() + def test_connection_down_and_up(self): """Are ConnectionUp/Down messages generated at all? does it reconnect?""" os.system("sudo service rabbitmq-server restart") diff --git a/tests/test_noack.py b/tests/test_noack.py index f134487..e50d8ba 100644 --- a/tests/test_noack.py +++ b/tests/test_noack.py @@ -3,16 +3,24 @@ from __future__ import absolute_import, division, print_function import unittest import six import os +import time from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConnectionDown, ConsumerCancelled, Message, Exchange +class TestNoAcknowledge(unittest.TestCase): + def drainTo(self, type_, timeout=20): + start = time.time() + while time.time() - start < timeout: + q = self.amqp.drain(1) + if isinstance(q, type_): + return q + self.fail('Did not find %s' % (type_, )) -class TestNoAcknowledge(unittest.TestCase): def setUp(self): self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) self.amqp.start() - self.assertIsInstance(self.amqp.drain(1), ConnectionUp) + self.drainTo(ConnectionUp, timeout=1) def tearDown(self): self.amqp.shutdown() @@ -28,9 +36,9 @@ class TestNoAcknowledge(unittest.TestCase): self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived) - self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) - self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) + self.drainTo(MessageReceived) + self.drainTo(MessageReceived) + self.drainTo(MessageReceived) def test_noack_works_after_restart(self): myq = Queue('myqueue', exclusive=True) @@ -51,50 +59,78 @@ class TestNoAcknowledge(unittest.TestCase): self.assertIsInstance(self.amqp.drain(wait=5), ConnectionDown) self.assertIsInstance(self.amqp.drain(wait=5), ConnectionUp) - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived) self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) def test_noack_coexists(self): - myq = Queue('myqueue', exclusive=True) - my2 = Queue('myqueue2', exclusive=True) - self.amqp.qos(0, 1, False) - self.amqp.consume(myq, no_ack=True) - self.amqp.consume(my2) - - msg = Message(b'') + self.amqp.consume(Queue('myqueue', exclusive=True), no_ack=True) + self.amqp.consume(Queue('myqueue2', exclusive=True)) - self.amqp.send(msg, routing_key='myqueue') - self.amqp.send(msg, routing_key='myqueue') - self.amqp.send(msg, routing_key='myqueue') + msg = Message(b'zz') - self.amqp.send(msg, routing_key='myqueue2') - self.amqp.send(msg, routing_key='myqueue2') - self.amqp.send(msg, routing_key='myqueue2').result() + for i in range(3): + self.amqp.send(msg, routing_key='myqueue') + self.amqp.send(msg, routing_key='myqueue2') - our_message = None + mq2s = [] for i in range(4): - mer = self.amqp.drain(wait=2) - self.assertIsInstance(mer, MessageReceived) + # I should have received 3 messages from myqueue, and 2 from myqueue2 + print('beng') + mer = self.drainTo(MessageReceived) if mer.message.routing_key == 'myqueue2': - self.assertIsNone(our_message) - our_message = mer - self.assertIsNotNone(our_message) + mq2s.append(mer.message) # Should receive nothing, since not acked - self.assertIsNone(self.amqp.drain(wait=2)) + self.assertIsNone(self.amqp.drain(wait=4)) + + self.assertEquals(len(mq2s), 1) # ack and receive - our_message.message.ack() + for me in mq2s: me.ack() mer = self.amqp.drain(wait=1) # 2nd self.assertIsInstance(mer, MessageReceived) mer.message.ack() mer = self.amqp.drain(wait=1) # 3rd self.assertIsInstance(mer, MessageReceived) mer.message.ack() + + @unittest.skip('demonstrates a py-amqp bug') + def test_noack_coexists_empty_message_body(self): + self.amqp.qos(0, 1, False) + + self.amqp.consume(Queue('myqueue', exclusive=True), no_ack=True) + self.amqp.consume(Queue('myqueue2', exclusive=True)) + + msg = Message(b'') # if this is empty, py-amqp fails + + for i in range(3): + self.amqp.send(msg, routing_key='myqueue') + self.amqp.send(msg, routing_key='myqueue2') + + mq2s = [] + for i in range(4): + # I should have received 3 messages from myqueue, and 2 from myqueue2 + mer = self.drainTo(MessageReceived) + if mer.message.routing_key == 'myqueue2': + mq2s.append(mer.message) + + # Should receive nothing, since not acked + self.assertIsNone(self.amqp.drain(wait=4)) + + self.assertEquals(len(mq2s), 1) + + # ack and receive + for me in mq2s: me.ack() + mer = self.amqp.drain(wait=1) # 2nd + self.assertIsInstance(mer, MessageReceived) + mer.message.ack() + mer = self.amqp.drain(wait=1) # 3rd + self.assertIsInstance(mer, MessageReceived) + mer.message.ack() \ No newline at end of file -- GitLab