diff --git a/README.md b/README.md index d8ccb83679594999cda614b960d8ee85781ff62b..d82c4859f52b8115bbb10437ecb7d137f092b43b 100644 --- a/README.md +++ b/README.md @@ -28,4 +28,7 @@ Enjoy! # Changelog -* v0.11 - added *no_ack* to *consume*, can pass other non-text types to Message \ No newline at end of file +## v0.11 +* added *no_ack* to *consume* +* can pass other non-text types to Message +* can set global bit in *qos* \ No newline at end of file diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 26e86861a6cb9bf6af701820bd5fa73f0f550f3e..0b402eba5b4e820af58abe43f72ae86a6d91f999 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -21,7 +21,7 @@ def translate_exceptions(fun): return fun(*args, **kwargs) except amqp.RecoverableChannelError as e: raise RemoteAMQPError(e.reply_code, e.reply_text) - except (IOError, amqp.ConnectionForced, amqp.IrrecoverableChannelError) as e: + except (IOError, amqp.ConnectionForced, amqp.IrrecoverableChannelError, amqp.exceptions.UnexpectedFrame) as e: msg = e.message if six.PY2 else e.args[0] raise ConnectionFailedError(msg) return q diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 8ac0041b443e9321e7dd8aca395dafa220b3a0b5..f026a8ae5a2baff022e00d183df32e905430a289 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -62,7 +62,8 @@ class ClusterHandlerThread(threading.Thread): self.backend = self.cluster.backend(node, self) if self.qos is not None: - self.backend.basic_qos(*self.qos) + pre_siz, pre_cou, glob = self.qos + self.backend.basic_qos(pre_siz, pre_cou, glob) for exchange in self.declared_exchanges.values(): self.backend.exchange_declare(exchange) @@ -104,7 +105,8 @@ class ClusterHandlerThread(threading.Thread): self.backend.basic_publish(order.message, order.exchange, order.routing_key) elif isinstance(order, SetQoS): self.qos = order.qos - self.backend.basic_qos(*self.qos) + pre_siz, pre_cou, glob = order.qos + self.backend.basic_qos(pre_siz, pre_cou, glob) elif isinstance(order, DeclareExchange): self.backend.exchange_declare(order.exchange) self.declared_exchanges[order.exchange.name] = order.exchange @@ -175,12 +177,10 @@ class ClusterHandlerThread(threading.Thread): assert self.is_terminating if self.cluster.connected or (self.backend is not None): - try: + if self.backend is not None: self.backend.shutdown() - except AttributeError: - pass # backend might be None - the if condition is "or" after all + self.backend = None - self.backend = None self.cluster.connected = False def terminate(self): diff --git a/coolamqp/messages.py b/coolamqp/messages.py index 8fe56d7aab6fc67f6ab07dd0ce07871c3a126bca..c28e19f4d89122cb8f33c81c7bc26d9f9ca2c8e4 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -8,7 +8,10 @@ class Message(object): def __init__(self, body, properties=None): """ - Create a Message object + Create a Message object. + + Please take care with passing empty bodies, as py-amqp has some failure on it. + :param body: stream of octets :type body: str (py2) or bytes (py3) :param properties: AMQP properties to be sent along @@ -16,7 +19,7 @@ class Message(object): if isinstance(body, six.text_type): raise TypeError('body cannot be a text type!') self.body = six.binary_type(body) - self.properties = {} if properties is None else properties + self.properties = properties or {} class ReceivedMessage(Message): diff --git a/tests/test_failures.py b/tests/test_failures.py index 2349fc922da395f0b683318e2f89a26dc4d4baf6..f091b41c339f51a40ca32c88e37b71280c753a44 100644 --- a/tests/test_failures.py +++ b/tests/test_failures.py @@ -38,6 +38,9 @@ class TestFailures(unittest.TestCase): def tearDown(self): self.amqp.shutdown() + def test_cancel_not_consumed_queue(self): + self.amqp.cancel(Queue('hello world')).result() + def test_connection_down_and_up(self): """Are ConnectionUp/Down messages generated at all? does it reconnect?""" os.system("sudo service rabbitmq-server restart") diff --git a/tests/test_noack.py b/tests/test_noack.py index f13448757793d07a0d54c324d4860cad5dec5c3a..e50d8babb4400c1a1de30f1679bce98cd379163f 100644 --- a/tests/test_noack.py +++ b/tests/test_noack.py @@ -3,16 +3,24 @@ from __future__ import absolute_import, division, print_function import unittest import six import os +import time from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConnectionDown, ConsumerCancelled, Message, Exchange +class TestNoAcknowledge(unittest.TestCase): + def drainTo(self, type_, timeout=20): + start = time.time() + while time.time() - start < timeout: + q = self.amqp.drain(1) + if isinstance(q, type_): + return q + self.fail('Did not find %s' % (type_, )) -class TestNoAcknowledge(unittest.TestCase): def setUp(self): self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) self.amqp.start() - self.assertIsInstance(self.amqp.drain(1), ConnectionUp) + self.drainTo(ConnectionUp, timeout=1) def tearDown(self): self.amqp.shutdown() @@ -28,9 +36,9 @@ class TestNoAcknowledge(unittest.TestCase): self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived) - self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) - self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) + self.drainTo(MessageReceived) + self.drainTo(MessageReceived) + self.drainTo(MessageReceived) def test_noack_works_after_restart(self): myq = Queue('myqueue', exclusive=True) @@ -51,50 +59,78 @@ class TestNoAcknowledge(unittest.TestCase): self.assertIsInstance(self.amqp.drain(wait=5), ConnectionDown) self.assertIsInstance(self.amqp.drain(wait=5), ConnectionUp) - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived) self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) def test_noack_coexists(self): - myq = Queue('myqueue', exclusive=True) - my2 = Queue('myqueue2', exclusive=True) - self.amqp.qos(0, 1, False) - self.amqp.consume(myq, no_ack=True) - self.amqp.consume(my2) - - msg = Message(b'') + self.amqp.consume(Queue('myqueue', exclusive=True), no_ack=True) + self.amqp.consume(Queue('myqueue2', exclusive=True)) - self.amqp.send(msg, routing_key='myqueue') - self.amqp.send(msg, routing_key='myqueue') - self.amqp.send(msg, routing_key='myqueue') + msg = Message(b'zz') - self.amqp.send(msg, routing_key='myqueue2') - self.amqp.send(msg, routing_key='myqueue2') - self.amqp.send(msg, routing_key='myqueue2').result() + for i in range(3): + self.amqp.send(msg, routing_key='myqueue') + self.amqp.send(msg, routing_key='myqueue2') - our_message = None + mq2s = [] for i in range(4): - mer = self.amqp.drain(wait=2) - self.assertIsInstance(mer, MessageReceived) + # I should have received 3 messages from myqueue, and 2 from myqueue2 + print('beng') + mer = self.drainTo(MessageReceived) if mer.message.routing_key == 'myqueue2': - self.assertIsNone(our_message) - our_message = mer - self.assertIsNotNone(our_message) + mq2s.append(mer.message) # Should receive nothing, since not acked - self.assertIsNone(self.amqp.drain(wait=2)) + self.assertIsNone(self.amqp.drain(wait=4)) + + self.assertEquals(len(mq2s), 1) # ack and receive - our_message.message.ack() + for me in mq2s: me.ack() mer = self.amqp.drain(wait=1) # 2nd self.assertIsInstance(mer, MessageReceived) mer.message.ack() mer = self.amqp.drain(wait=1) # 3rd self.assertIsInstance(mer, MessageReceived) mer.message.ack() + + @unittest.skip('demonstrates a py-amqp bug') + def test_noack_coexists_empty_message_body(self): + self.amqp.qos(0, 1, False) + + self.amqp.consume(Queue('myqueue', exclusive=True), no_ack=True) + self.amqp.consume(Queue('myqueue2', exclusive=True)) + + msg = Message(b'') # if this is empty, py-amqp fails + + for i in range(3): + self.amqp.send(msg, routing_key='myqueue') + self.amqp.send(msg, routing_key='myqueue2') + + mq2s = [] + for i in range(4): + # I should have received 3 messages from myqueue, and 2 from myqueue2 + mer = self.drainTo(MessageReceived) + if mer.message.routing_key == 'myqueue2': + mq2s.append(mer.message) + + # Should receive nothing, since not acked + self.assertIsNone(self.amqp.drain(wait=4)) + + self.assertEquals(len(mq2s), 1) + + # ack and receive + for me in mq2s: me.ack() + mer = self.amqp.drain(wait=1) # 2nd + self.assertIsInstance(mer, MessageReceived) + mer.message.ack() + mer = self.amqp.drain(wait=1) # 3rd + self.assertIsInstance(mer, MessageReceived) + mer.message.ack() \ No newline at end of file