diff --git a/README.md b/README.md index d82c4859f52b8115bbb10437ecb7d137f092b43b..1c7e049f56f12167f83f1d9e160f1b28cc7fcc2e 100644 --- a/README.md +++ b/README.md @@ -27,8 +27,12 @@ either on Vagrant (Vagrantfile attached) or Travis CI, and run against RabbitMQ. Enjoy! # Changelog - +## v0.12 +* ACCESS_REFUSED/RESOURCE_LOCKED on reconnect is properly handled +* reason for consumer cancel is provided +* can read error code and reply text from failed orders +* test suite refactored and improved ## v0.11 * added *no_ack* to *consume* * can pass other non-text types to Message -* can set global bit in *qos* \ No newline at end of file +* can set global bit in *qos* diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index de4950082928a5e00e62ca50df760115d4b9fb8b..64f889284517fbb688deda2350210370ee5285ff 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,5 +1,6 @@ # coding=UTF-8 -from .cluster import ClusterNode, Cluster -from .events import ConnectionDown, ConnectionUp, MessageReceived, ConsumerCancelled -from .messages import Message, Exchange, Queue +from coolamqp.cluster import ClusterNode, Cluster +from coolamqp.events import ConnectionDown, ConnectionUp, MessageReceived, ConsumerCancelled +from coolamqp.messages import Message, Exchange, Queue +from coolamqp.backends.base import Cancelled, Discarded diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py index bc8399ac9463d55ddd37e8abe7dc25adc1ad0929..77106aaea6de30fafc10248afa1fc5d5ec8e89d0 100644 --- a/coolamqp/backends/__init__.py +++ b/coolamqp/backends/__init__.py @@ -1,3 +1,3 @@ # coding=UTF-8 -from .pyamqp import PyAMQPBackend -from .base import AMQPError, ConnectionFailedError, RemoteAMQPError, Cancelled +from coolamqp.backends.pyamqp import PyAMQPBackend +from coolamqp.backends.base import AMQPError, ConnectionFailedError, RemoteAMQPError, Cancelled diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index 7d4a25cb1bc7f83f3bbb849a3f28f0729dea7238..449ff89e5cea23af4c5d04aab35f104bdcbbb9a3 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -1,10 +1,24 @@ # coding=UTF-8 class AMQPError(Exception): """Connection errors and bawking of AMQP server""" + code = None + reply_text = 'AMQP error' + + def __repr__(self): + return u'AMQPError()' class ConnectionFailedError(AMQPError): """Connection to broker failed""" + reply_text = 'failed connecting to broker' + + def __repr__(self): + return u'ConnectionFailedError("%s")' % map(repr, (self.reply_text, )) + + +class Discarded(Exception): + """send() for this message had discard_on_retry""" + class Cancelled(Exception): """Cancel ordered by user""" @@ -21,7 +35,10 @@ class RemoteAMQPError(AMQPError): """ AMQPError.__init__(self, text) self.code = code + self.text = text or 'server sent back an error' + def __repr__(self): + return u'RemoteAMQPError(%s, %s)' % map(repr, (self.code, self.text)) class AMQPBackend(object): """ diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 0b402eba5b4e820af58abe43f72ae86a6d91f999..826e2b3ce7b2eec3af5a1eeb2967cd784509fc7f 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -6,7 +6,7 @@ import socket import six import functools import logging -from .base import AMQPBackend, RemoteAMQPError, ConnectionFailedError +from coolamqp.backends.base import AMQPBackend, RemoteAMQPError, ConnectionFailedError import monotonic @@ -14,16 +14,24 @@ logger = logging.getLogger(__name__) def translate_exceptions(fun): - """Translates pyamqp's exceptions to CoolAMQP's""" + """ + Translates pyamqp's exceptions to CoolAMQP's + + py-amqp's exceptions are less than intuitive, so expect many special cases + """ @functools.wraps(fun) def q(*args, **kwargs): try: return fun(*args, **kwargs) - except amqp.RecoverableChannelError as e: + except (amqp.RecoverableChannelError, + amqp.exceptions.NotFound, + amqp.exceptions.AccessRefused) as e: raise RemoteAMQPError(e.reply_code, e.reply_text) - except (IOError, amqp.ConnectionForced, amqp.IrrecoverableChannelError, amqp.exceptions.UnexpectedFrame) as e: - msg = e.message if six.PY2 else e.args[0] - raise ConnectionFailedError(msg) + except (IOError, + amqp.ConnectionForced, + amqp.IrrecoverableChannelError, + amqp.exceptions.UnexpectedFrame) as e: + raise ConnectionFailedError(e.message if six.PY2 else e.args[0]) return q diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 5a5fa5a6626b0bf668315abbe95b02121328e94e..1ee71eb9709e71ffe1bbc1185ba4ed0c5cf3e3d9 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -2,9 +2,9 @@ import itertools from six.moves import queue as Queue from coolamqp.backends import PyAMQPBackend -from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ - DeleteExchange, SetQoS, DeclareQueue -from .messages import Exchange +from coolamqp.orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ + DeleteExchange, SetQoS, DeclareQueue, Order +from coolamqp.messages import Exchange class ClusterNode(object): @@ -83,19 +83,28 @@ class Cluster(object): from .handler import ClusterHandlerThread self.thread = ClusterHandlerThread(self) - def send(self, message, exchange='', routing_key='', on_completed=None, on_failed=None): + def send(self, message, exchange='', routing_key='', discard_on_fail=False, on_completed=None, on_failed=None): """ Schedule a message to be sent. :param message: Message object to send. :param exchange: Exchange to use. Leave None to use the default exchange :param routing_key: routing key to use + :param discard_on_fail: if True, then message is valid for sending ONLY with current connection. + Will be discarded upon fail. :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance :return: a Future with this order's status """ a = SendMessage(message, exchange or Exchange.direct, routing_key, + discard_on_fail=discard_on_fail, on_completed=on_completed, on_failed=on_failed) - self.thread.order_queue.append(a) + if discard_on_fail and self.thread.backend is None: + o = Order() + o.discarded = True + on_failed(Discarded()) + return o + # discard at once if no point in sending + self.thread.order_queue.append(a) return a def declare_exchange(self, exchange, on_completed=None, on_failed=None): diff --git a/coolamqp/events.py b/coolamqp/events.py index 1f42085cd9d7fd6ad35498901de6969d653b0395..0dbe85087adab5dc48779423a53bafe8842bb5e6 100644 --- a/coolamqp/events.py +++ b/coolamqp/events.py @@ -29,10 +29,25 @@ class MessageReceived(ClusterEvent): class ConsumerCancelled(ClusterEvent): - """Broker cancelled a consumer of ours. - This is also generated in response to cancelling consumption from a queue""" - def __init__(self, queue): + """ + Broker cancelled a consumer of ours. + This is also generated in response to cancelling consumption from a queue + """ + + BROKER_CANCEL = 0 + REFUSED_ON_RECONNECT = 1 + USER_CANCEL = 2 + + def __init__(self, queue, reason): """ :param queue: Queue whose consumer was cancelled + :param reason: Reason why the consumer was cancelled + ConsumerCancelled.BROKER_CANCEL - broker informed us about cancelling + ConsumerCancelled.REFUSED_ON_RECONNECT - during a reconnect, I tried to consume an exclusive + queue and got ACCESS_REFUSED. + These messages will arrive between ConnectionDown and + ConnectionUp. + ConsumedCancelled.USER_CANCEL - user called cluster.cancel() """ self.queue = queue + self.reason = reason diff --git a/coolamqp/handler.py b/coolamqp/handler.py index f026a8ae5a2baff022e00d183df32e905430a289..3645ba38d75483b0c6937bd7b6e792a48cf69095 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -68,11 +68,22 @@ class ClusterHandlerThread(threading.Thread): for exchange in self.declared_exchanges.values(): self.backend.exchange_declare(exchange) + failed_queues = [] for queue, no_ack in self.queues_by_consumer_tags.values(): - self.backend.queue_declare(queue) - if queue.exchange is not None: - self.backend.queue_bind(queue, queue.exchange) - self.backend.basic_consume(queue, no_ack=no_ack) + try: + self.backend.queue_declare(queue) + if queue.exchange is not None: + self.backend.queue_bind(queue, queue.exchange) + self.backend.basic_consume(queue, no_ack=no_ack) + except RemoteAMQPError as e: + if e.code in (403, 405): # access refused, resource locked + self.event_queue.put(ConsumerCancelled(queue, ConsumerCancelled.REFUSED_ON_RECONNECT)) + failed_queues.append(queue) + else: + raise + + for failed_queue in failed_queues: + del self.queues_by_consumer_tags[failed_queue.consumer_tag] except ConnectionFailedError as e: # a connection failure happened :( @@ -98,7 +109,8 @@ class ClusterHandlerThread(threading.Thread): try: if order.cancelled: - order.failed(Cancelled()) + logger.debug('Order %s was cancelled', order) + order._failed(Cancelled()) return if isinstance(order, SendMessage): @@ -120,7 +132,7 @@ class ClusterHandlerThread(threading.Thread): self.backend.queue_delete(order.queue) elif isinstance(order, ConsumeQueue): if order.queue.consumer_tag in self.queues_by_consumer_tags: - order.completed() + order._completed() return # already consuming, belay that self.backend.queue_declare(order.queue) @@ -137,7 +149,7 @@ class ClusterHandlerThread(threading.Thread): pass # wat? else: self.backend.basic_cancel(order.queue.consumer_tag) - self.event_queue.put(ConsumerCancelled(order.queue)) + self.event_queue.put(ConsumerCancelled(order.queue, ConsumerCancelled.USER_CANCEL)) elif isinstance(order, AcknowledgeMessage): if order.connect_id == self.connect_id: self.backend.basic_ack(order.delivery_tag) @@ -146,12 +158,12 @@ class ClusterHandlerThread(threading.Thread): self.backend.basic_reject(order.delivery_tag) except RemoteAMQPError as e: logger.error('Remote AMQP error: %s', e) - order.failed(e) # we are allowed to go on + order._failed(e) # we are allowed to go on except ConnectionFailedError: self.order_queue.appendleft(order) raise else: - order.completed() + order._completed() def __run_wrap(self): # throws _ImOuttaHere # Loop while there are things to do @@ -163,7 +175,7 @@ class ClusterHandlerThread(threading.Thread): # just drain shit self.backend.process(max_time=0.05) except ConnectionFailedError as e: - logger.warning('Connection to broker lost') + logger.warning('Connection to broker lost: %s', e) self.cluster.connected = False self.event_queue.put(ConnectionDown()) self._reconnect() @@ -213,7 +225,7 @@ class ClusterHandlerThread(threading.Thread): except KeyError: return # what? - self.event_queue.put(ConsumerCancelled(queue)) + self.event_queue.put(ConsumerCancelled(queue, ConsumerCancelled.BROKER_CANCEL)) ## methods to enqueue something into CHT to execute diff --git a/coolamqp/messages.py b/coolamqp/messages.py index c28e19f4d89122cb8f33c81c7bc26d9f9ca2c8e4..ee76e35bbf02f1c960d944e365d0eab32f06b49c 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -99,7 +99,8 @@ class Queue(object): def __init__(self, name='', durable=False, exchange=None, exclusive=False, auto_delete=False): """ - Create a queue definition + Create a queue definition. + :param name: name of the queue. Take special care if this is empty. If empty, this will be filled-in by the broker upon declaration. If a disconnect happens, and connection to other node is diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 90271bf6f7d312f088d574d82e13f718c2e74347..877253f183bd1d01fd3fbbf6781979cc457f46a1 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -3,6 +3,7 @@ Orders that can be dispatched to ClusterHandlerThread """ from threading import Lock +import warnings _NOOP_COMP = lambda: None @@ -21,7 +22,13 @@ class Order(object): bye() then hello() will be called BEFORE bye(). - Callbacks are called from CoolAMQP's internal thread + Callbacks are called from CoolAMQP's internal thread. + + If this fails, then property .error_code can be read to get the error code. + and .reply_text has the reply of the server or some other reason. These are set before + callbacks are called. + Error code is None, if not available, or AMQP constants describing errors, + eg. 502 for syntax error. """ self.on_completed = on_completed or _NOOP_COMP self.on_failed = on_failed or _NOOP_FAIL @@ -31,7 +38,10 @@ class Order(object): # private self.lock = Lock() self.lock.acquire() - self.cancelled = False + self.cancelled = False #: public + self.discarded = False #: public + self.error_code = None + self.reply_text = None def has_finished(self): """Return if this task has either completed or failed""" @@ -41,24 +51,47 @@ class Order(object): """Cancel this order""" self.cancelled = True - def completed(self): + def _completed(self): # called by handler self._result = True self.on_completed() self.lock.release() - def failed(self, e): + def _failed(self, e): # called by handler """ :param e: AMQPError instance or Cancelled instance """ + from coolamqp.backends import Cancelled self._result = e + if not isinstance(e, Cancelled): # a true error + self.error_code = e.code + self.reply_text = e.reply_text + self.on_failed(e) self.lock.release() + def wait(self): + """Wait until this is completed and return whether the order succeeded""" + self.lock.acquire() + return self._result is True + + def has_failed(self): + """Return whether the operation failed, ie. completed but with an error code. + User-cancelled operations are not failed. + This assumes that this order has been .wait()ed upon""" + assert self._result is not None + return not (self.cancelled or self._result is True) + def result(self): """Wait until this is completed and return a response""" + warnings.warn('Use .wait() instead', PendingDeprecationWarning) self.lock.acquire() return self._result + @staticmethod + def _discarded(on_completed=None, on_failed=None): # return order for a discarded message + o = Order(on_completed=on_completed, on_failed=on_failed) + self.on_completed() + class SendMessage(Order): """Send a message""" diff --git a/tests/test_basics.py b/tests/test_basics.py index c777f4e1c8a6234b1d65eff32ebf7de751ed3639..70f19b1d212ff53b069b59d0694fecf1782d0823 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -1,18 +1,14 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import unittest import six from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConsumerCancelled, Message, Exchange +from tests.utils import getamqp, CoolAMQPTestCase -def getamqp(): - amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) - amqp.start() - return amqp +class TestThings(CoolAMQPTestCase): + INIT_AMQP = False - -class TestThings(unittest.TestCase): def test_different_constructor_for_clusternode(self): cn = ClusterNode(host='127.0.0.1', user='guest', password='guest', virtual_host='/') amqp = Cluster([cn]) @@ -20,13 +16,8 @@ class TestThings(unittest.TestCase): self.assertIsInstance(amqp.drain(1), ConnectionUp) amqp.shutdown() -class TestBasics(unittest.TestCase): - def setUp(self): - self.amqp = getamqp() - self.assertIsInstance(self.amqp.drain(2), ConnectionUp) - def tearDown(self): - self.amqp.shutdown() +class TestBasics(CoolAMQPTestCase): def test_acknowledge(self): myq = Queue('myqueue', exclusive=True) @@ -34,8 +25,7 @@ class TestBasics(unittest.TestCase): self.amqp.consume(myq) self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - p = self.amqp.drain(wait=1) - self.assertIsInstance(p, MessageReceived) + p = self.drainTo(MessageReceived, 4) self.assertEquals(p.message.body, b'what the fuck') self.assertIsInstance(p.message.body, six.binary_type) p.message.ack() @@ -54,13 +44,11 @@ class TestBasics(unittest.TestCase): self.amqp.consume(myq) self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - p = self.amqp.drain(wait=4) - self.assertIsInstance(p, MessageReceived) + p = self.drainTo(MessageReceived, 4) self.assertEquals(p.message.body, b'what the fuck') p.message.nack() - p = self.amqp.drain(wait=4) - self.assertIsInstance(p, MessageReceived) + p = self.drainTo(MessageReceived, 4) self.assertEquals(p.message.body, b'what the fuck') def test_bug_hangs(self): @@ -70,16 +58,14 @@ class TestBasics(unittest.TestCase): def test_consume_declare(self): """Spawn a second connection. One declares an exclusive queue, other tries to consume from it""" - amqp2 = getamqp() - - has_failed = {'has_failed': False} + with self.new_amqp_connection() as amqp2: - self.amqp.declare_queue(Queue('lol', exclusive=True)).result() - amqp2.consume(Queue('lol', exclusive=True), on_failed=lambda e: has_failed.update({'has_failed': True})).result() + has_failed = {'has_failed': False} - self.assertTrue(has_failed['has_failed']) + self.amqp.declare_queue(Queue('lol', exclusive=True)).result() + amqp2.consume(Queue('lol', exclusive=True), on_failed=lambda e: has_failed.update({'has_failed': True})).result() - amqp2.shutdown() + self.assertTrue(has_failed['has_failed']) def test_qos(self): self.amqp.qos(0, 1) @@ -88,25 +74,22 @@ class TestBasics(unittest.TestCase): self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - p = self.amqp.drain(wait=4) - self.assertIsInstance(p, MessageReceived) + p = self.drainTo(MessageReceived, 4) - self.assertIsNone(self.amqp.drain(wait=5)) + self.drainToNone(5) p.message.ack() self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) def test_consume_twice(self): """Spawn a second connection and try to consume an exclusive queue twice""" - amqp2 = getamqp() + with self.new_amqp_connection() as amqp2: - has_failed = {'has_failed': False} - - self.amqp.consume(Queue('lol', exclusive=True)).result() - amqp2.consume(Queue('lol', exclusive=True), on_failed=lambda e: has_failed.update({'has_failed': True})).result() + has_failed = {'has_failed': False} - self.assertTrue(has_failed['has_failed']) + self.amqp.consume(Queue('lol', exclusive=True)).result() + amqp2.consume(Queue('lol', exclusive=True), on_failed=lambda e: has_failed.update({'has_failed': True})).result() - amqp2.shutdown() + self.assertTrue(has_failed['has_failed']) def test_send_and_receive(self): myq = Queue('myqueue', exclusive=True) @@ -114,9 +97,7 @@ class TestBasics(unittest.TestCase): self.amqp.consume(myq) self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - p = self.amqp.drain(wait=10) - self.assertIsInstance(p, MessageReceived) - self.assertEquals(p.message.body, b'what the fuck') + self.assertEquals(self.drainTo(MessageReceived, 3).message.body, b'what the fuck') def test_consumer_cancelled_on_queue_deletion(self): myq = Queue('myqueue', exclusive=True) @@ -124,7 +105,7 @@ class TestBasics(unittest.TestCase): self.amqp.consume(myq) self.amqp.delete_queue(myq) - self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled) + self.assertEquals(self.drainTo(ConsumerCancelled, 5).reason, ConsumerCancelled.BROKER_CANCEL) def test_consumer_cancelled_on_consumer_cancel(self): myq = Queue('myqueue', exclusive=True) @@ -132,7 +113,8 @@ class TestBasics(unittest.TestCase): self.amqp.consume(myq) self.amqp.cancel(myq) - self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled) + c = self.drainTo(ConsumerCancelled, 10) + self.assertEquals(c.reason, ConsumerCancelled.USER_CANCEL) def test_delete_exchange(self): xchg = Exchange('a_fanout', type='fanout') @@ -151,5 +133,5 @@ class TestBasics(unittest.TestCase): self.amqp.send(Message(b'hello'), xchg) - self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) - self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) + self.drainTo(MessageReceived, 4) + self.drainTo(MessageReceived, 4) diff --git a/tests/test_failures.py b/tests/test_failures.py index f091b41c339f51a40ca32c88e37b71280c753a44..029ec2d176fd33534ffa9a94d16bef78809bb0c7 100644 --- a/tests/test_failures.py +++ b/tests/test_failures.py @@ -10,6 +10,8 @@ from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, NODE = ClusterNode('127.0.0.1', 'guest', 'guest') +from tests.utils import CoolAMQPTestCase + class TestSpecialCases(unittest.TestCase): def test_termination_while_disconnect(self): @@ -28,47 +30,30 @@ class TestSpecialCases(unittest.TestCase): os.system("sudo service rabbitmq-server start") -class TestFailures(unittest.TestCase): - - def setUp(self): - self.amqp = Cluster([NODE]) - self.amqp.start() - self.assertIsInstance(self.amqp.drain(1), ConnectionUp) - - def tearDown(self): - self.amqp.shutdown() +class TestFailures(CoolAMQPTestCase): def test_cancel_not_consumed_queue(self): self.amqp.cancel(Queue('hello world')).result() - def test_connection_down_and_up(self): - """Are ConnectionUp/Down messages generated at all? does it reconnect?""" - os.system("sudo service rabbitmq-server restart") - self.assertIsInstance(self.amqp.drain(wait=6), ConnectionDown) - self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp) - def test_longer_disconnects(self): os.system("sudo service rabbitmq-server stop") - self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) + self.drainTo(ConnectionDown, 4) time.sleep(12) os.system("sudo service rabbitmq-server start") - self.assertIsInstance(self.amqp.drain(wait=30), ConnectionUp) + self.drainTo(ConnectionUp, 35) def test_qos_redeclared_on_fail(self): self.amqp.qos(0, 1).result() - os.system("sudo service rabbitmq-server restart") - self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) - self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp) + self.restart_rmq() self.amqp.consume(Queue('lol', exclusive=True)).result() self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - p = self.amqp.drain(wait=4) - self.assertIsInstance(p, MessageReceived) + p = self.drainTo(MessageReceived, 4) - self.assertIs(self.amqp.drain(wait=5), None) + self.drainToNone(5) p.message.ack() self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) @@ -77,24 +62,24 @@ class TestFailures(unittest.TestCase): self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) self.assertFalse(self.amqp.connected) os.system("sudo service rabbitmq-server start") - self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp) + self.assertIsInstance(self.amqp.drain(wait=20), ConnectionUp) self.assertTrue(self.amqp.connected) def test_sending_a_message_is_cancelled(self): """are messages generated at all? does it reconnect?""" - q1 = Queue('wtf1', exclusive=True) - self.amqp.consume(q1).result() + self.amqp.consume(Queue('wtf1', exclusive=True)) os.system("sudo service rabbitmq-server stop") - self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) - result = self.amqp.send(Message(b'what the fuck'), '', routing_key='wtf1') - result.cancel() + self.drainTo(ConnectionDown, 5) - os.system("sudo service rabbitmq-server start") + p = self.amqp.send(Message(b'what the fuck'), routing_key='wtf1') + p.cancel() + self.assertTrue(p.wait()) + self.assertFalse(p.has_failed()) - self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp) - self.assertIsNone(self.amqp.drain(wait=6)) # message is NOT received + os.system("sudo service rabbitmq-server start") + self.drainToAny([ConnectionUp], 30, forbidden=[MessageReceived]) def test_qos_after_failure(self): self.amqp.qos(0, 1) @@ -110,19 +95,16 @@ class TestFailures(unittest.TestCase): p.message.ack() self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) - os.system("sudo service rabbitmq-server restart") - self.assertIsInstance(self.amqp.drain(wait=6), ConnectionDown) - self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + self.restart_rmq() self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - p = self.amqp.drain(wait=4) - self.assertIsInstance(p, MessageReceived) + p = self.drainTo(MessageReceived, 4) - self.assertIsNone(self.amqp.drain(wait=5)) + self.drainToNone(5) p.message.ack() - self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) + self.drainTo(MessageReceived, 4) def test_connection_down_and_up_redeclare_queues(self): """are messages generated at all? does it reconnect?""" @@ -130,13 +112,11 @@ class TestFailures(unittest.TestCase): q1 = Queue('wtf1', exclusive=True) self.amqp.consume(q1).result() - os.system("sudo service rabbitmq-server restart") - self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) - self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + self.restart_rmq() - self.amqp.send(Message(b'what the fuck'), '', routing_key='wtf1') + self.amqp.send(Message(b'what the fuck'), routing_key='wtf1') - self.assertIsInstance(self.amqp.drain(wait=10), MessageReceived) + self.drainTo(MessageReceived, 20) def test_exchanges_are_redeclared(self): xchg = Exchange('a_fanout', type='fanout') @@ -148,10 +128,19 @@ class TestFailures(unittest.TestCase): self.amqp.consume(q1) self.amqp.consume(q2).result() - os.system('sudo service rabbitmq-server restart') + self.restart_rmq() self.amqp.send(Message(b'hello'), xchg) - self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) - self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp) - self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) - self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) + self.drainTo([MessageReceived, MessageReceived], 20) + + def test_consuming_exclusive_queue(self): + # declare and eat + q1 = Queue('q1', exclusive=True) + + self.amqp.consume(q1).wait() + + with self.new_amqp_connection() as amqp2: + q2 = Queue('q1', exclusive=True) + + r = amqp2.consume(q2) + self.assertFalse(r.wait()) diff --git a/tests/test_noack.py b/tests/test_noack.py index 7a674f8d0f3cea364fd78b02589b9346902f524f..4d0a3669a3b79bf75b4e5ae23f2769870d401236 100644 --- a/tests/test_noack.py +++ b/tests/test_noack.py @@ -1,30 +1,13 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import unittest import six import os import time +from tests.utils import CoolAMQPTestCase from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConnectionDown, ConsumerCancelled, Message, Exchange -class TestNoAcknowledge(unittest.TestCase): - - def drainTo(self, type_, timeout=20): - start = time.time() - while time.time() - start < timeout: - q = self.amqp.drain(1) - if isinstance(q, type_): - return q - self.fail('Did not find %s' % (type_, )) - - def setUp(self): - self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) - self.amqp.start() - self.drainTo(ConnectionUp, timeout=1) - - def tearDown(self): - self.amqp.shutdown() - +class TestNoAcknowledge(CoolAMQPTestCase): def test_noack_works(self): myq = Queue('myqueue', exclusive=True) @@ -36,9 +19,7 @@ class TestNoAcknowledge(unittest.TestCase): self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.drainTo(MessageReceived) - self.drainTo(MessageReceived) - self.drainTo(MessageReceived) + self.drainTo([MessageReceived, MessageReceived, MessageReceived], [3, 3, 3]) def test_noack_works_after_restart(self): myq = Queue('myqueue', exclusive=True) @@ -51,21 +32,15 @@ class TestNoAcknowledge(unittest.TestCase): self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived) - self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) - self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) + self.drainTo([MessageReceived, MessageReceived, MessageReceived], [3, 3, 3]) - os.system("sudo service rabbitmq-server restart") - self.assertIsInstance(self.amqp.drain(wait=5), ConnectionDown) - self.assertIsInstance(self.amqp.drain(wait=5), ConnectionUp) + self.restart_rmq() self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') - self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived) - self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) - self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) + self.drainTo([MessageReceived, MessageReceived, MessageReceived], [3, 3, 3]) def test_noack_coexists(self): self.amqp.qos(0, 1, False) @@ -94,12 +69,8 @@ class TestNoAcknowledge(unittest.TestCase): # ack and receive for me in mq2s: me.ack() - mer = self.amqp.drain(wait=1) # 2nd - self.assertIsInstance(mer, MessageReceived) - mer.message.ack() - mer = self.amqp.drain(wait=1) # 3rd - self.assertIsInstance(mer, MessageReceived) - mer.message.ack() + self.drainTo(MessageReceived, 1).message.ack() # 2nd + self.drainTo(MessageReceived, 1).message.ack() # 3rd @unittest.skip('demonstrates a py-amqp bug') def test_noack_coexists_empty_message_body(self): diff --git a/tests/test_performance.py b/tests/test_performance.py index 9ae570422aebbf5a3772b15501eaecf0f2ae5e2a..d5bc651ccd7b8f0c9380f450ec0e783111cadc8b 100644 --- a/tests/test_performance.py +++ b/tests/test_performance.py @@ -1,6 +1,6 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import unittest +from tests.utils import CoolAMQPTestCase import six import time @@ -8,7 +8,7 @@ from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConnectionDown, ConsumerCancelled, Message, Exchange -class TestBasics(unittest.TestCase): +class TestBasics(CoolAMQPTestCase): def setUp(self): self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) self.amqp.start() @@ -17,28 +17,6 @@ class TestBasics(unittest.TestCase): def tearDown(self): self.amqp.shutdown() - def takes_less_than(self, max_time): - """ - Tests that your code executes in less time than specified value. - Use like: - - with self.takes_less_than(0.9): - my_operation() - - :param max_time: in seconds - """ - test = self - - class CM(object): - def __enter__(self): - self.started_at = time.time() - - def __exit__(self, tp, v, tb): - test.assertLess(time.time() - self.started_at, max_time) - return False - - return CM() - def test_sending_a_message(self): with self.takes_less_than(0.5): diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..49e8565b09ee5f5703f2d3d3cf218e5f5b6b9c9c --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,129 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import unittest +from threading import Lock +import time +import collections +import os +import monotonic + +from coolamqp import Cluster, ClusterNode, ConnectionUp, ConnectionDown, ConnectionUp, ConsumerCancelled + + +def getamqp(): + amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) + amqp.start() + return amqp + + +class CoolAMQPTestCase(unittest.TestCase): + """ + Base class for all CoolAMQP tests. Creates na AMQP connection, provides methods + for easy interfacing, and other utils. + """ + INIT_AMQP = True # override on child classes + + + def new_amqp_connection(self, consume_connectionup=True): + obj = self + + class CM(object): + """Context manager. Get new AMQP uplink. Consume ConnectionUp if consume_connectionup + + Use like: + + with self.new_amqp_connection() as amqp2: + amqp2.consume(...) + + """ + def __enter__(self): + self.amqp = getamqp() + if consume_connectionup: + obj.assertIsInstance(self.amqp.drain(3), ConnectionUp) + return self.amqp + + def __exit__(self, exc_type, exc_val, exc_tb): + self.amqp.shutdown() + return False + return CM() + def restart_rmq(self): + # forcibly reset the connection + class FailbowlSocket(object): + def __getattr__(self, name): + import socket + raise socket.error() + + self.amqp.thread.backend.channel.connection.transport.sock = FailbowlSocket() + + self.drainTo([ConnectionDown, ConnectionUp], [5, 10]) + + def setUp(self): + if self.INIT_AMQP: + os.system('sudo service rabbitmq-server start') # if someone killed it + self.__newam = self.new_amqp_connection() + self.amqp = self.__newam.__enter__() + + def tearDown(self): + if self.INIT_AMQP: + self.__newam.__exit__(None, None, None) + + def drainToNone(self, timeout=4): + self.assertIsNone(self.amqp.drain(4)) + + def drainToAny(self, types, timeout, forbidden=[]): + """Assert that messages with types, in any order, are found within timeout. + Fail if any type from forbidden is found""" + start = monotonic.monotonic() + types = set(types) + while monotonic.monotonic() - start < timeout: + q = self.amqp.drain(1) + if type(q) in forbidden: + self.fail('%s found', type(q)) + if type(q) in types: + types.remove(type(q)) + if len(types) > 0: + self.fail('Not found %s' % (''.join(map(str, types)), )) + + def drainTo(self, type_, timeout, forbidden=[ConsumerCancelled]): + """ + Return next event of type_. It has to occur within timeout, or fail. + + If you pass iterable (len(type_) == len(timeout), last result will be returned + and I will drainTo() in order. + """ + if isinstance(type_, collections.Iterable): + self.assertIsInstance(timeout, collections.Iterable) + for tp, ti in zip(type_, timeout): + p = self.drainTo(tp, ti) + if type(p) in forbidden: + self.fail('Found %s but forbidden', type(p)) + return p + + start = monotonic.monotonic() + while monotonic.monotonic() - start < timeout: + q = self.amqp.drain(1) + if isinstance(q, type_): + return q + self.fail('Did not find %s' % (type_, )) + + def takes_less_than(self, max_time): + """ + Tests that your code executes in less time than specified value. + Use like: + + with self.takes_less_than(0.9): + my_operation() + + :param max_time: in seconds + """ + test = self + + class CM(object): + def __enter__(self): + self.started_at = time.time() + + def __exit__(self, tp, v, tb): + test.assertLess(time.time() - self.started_at, max_time) + return False + + return CM()