diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index ff0f02153774265451f7975d7dacea215a5a5f11..9cb7636b3d7a0fb975341079085183160bfc536e 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -86,10 +86,11 @@ class AMQPBackend(object): :param consumer_tag: consumer_tag to cancel """ - def basic_consume(self, queue): + def basic_consume(self, queue, no_ack=False): """ Start consuming from a queue :param queue: Queue object + :param no_ack: Messages will not need to be ack()ed for this queue """ def basic_ack(self, delivery_tag): diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 4df7edcdf3011e390840bcf519a27e3fc7de1b0e..c06abdcebc972f256debed54302ecd0a7bdb0813 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -128,7 +128,7 @@ class PyAMQPBackend(AMQPBackend): queue.name = qname @translate_exceptions - def basic_consume(self, queue): + def basic_consume(self, queue, no_ack=False): """ Start consuming from a queue :param queue: Queue object @@ -136,6 +136,7 @@ class PyAMQPBackend(AMQPBackend): self.channel.basic_consume(queue.name, consumer_tag=queue.consumer_tag, exclusive=queue.exclusive, + no_ack=no_ack, callback=self.__on_message, on_cancel=self.__on_consumercancelled) diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 5d5bb5c19efe225f4d9043707464fa5c6842e2f7..ffdd05805e5f7adbe19c4b016e57ef4e4d06a597 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -169,7 +169,7 @@ class Cluster(object): self.thread.order_queue.append(a) return a - def consume(self, queue, on_completed=None, on_failed=None): + def consume(self, queue, no_ack=False, on_completed=None, on_failed=None): """ Start consuming from a queue @@ -178,10 +178,11 @@ class Cluster(object): :param queue: Queue to consume from :param on_completed: callable/0 to call when this succeeds + :param no_ack: if True, you will not need to call .ack() for this queue :param on_failed: callable/1 to call when this fails with AMQPError instance :return: a Future with this order's status """ - a = ConsumeQueue(queue, on_completed=on_completed, on_failed=on_failed) + a = ConsumeQueue(queue, no_ack=no_ack, on_completed=on_completed, on_failed=on_failed) self.thread.order_queue.append(a) return a diff --git a/coolamqp/handler.py b/coolamqp/handler.py index f8236396d1fa264ec613a50fae6adce0ddc30f66..8ac0041b443e9321e7dd8aca395dafa220b3a0b5 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -39,7 +39,7 @@ class ClusterHandlerThread(threading.Thread): self.connect_id = -1 # connectID of current connection self.declared_exchanges = {} # declared exchanges, by their names - self.queues_by_consumer_tags = {} # subbed queues, by their consumer tags + self.queues_by_consumer_tags = {} # tuple of (subbed queue, no_ack::bool), by consumer tags self.backend = None self.first_connect = True @@ -67,11 +67,11 @@ class ClusterHandlerThread(threading.Thread): for exchange in self.declared_exchanges.values(): self.backend.exchange_declare(exchange) - for queue in self.queues_by_consumer_tags.values(): + for queue, no_ack in self.queues_by_consumer_tags.values(): self.backend.queue_declare(queue) if queue.exchange is not None: self.backend.queue_bind(queue, queue.exchange) - self.backend.basic_consume(queue) + self.backend.basic_consume(queue, no_ack=no_ack) except ConnectionFailedError as e: # a connection failure happened :( @@ -126,11 +126,11 @@ class ClusterHandlerThread(threading.Thread): if order.queue.exchange is not None: self.backend.queue_bind(order.queue, order.queue.exchange) - self.backend.basic_consume(order.queue) - self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue + self.backend.basic_consume(order.queue, no_ack=order.no_ack) + self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue, order.no_ack elif isinstance(order, CancelQueue): try: - q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag) + q, no_ack = self.queues_by_consumer_tags.pop(order.queue.consumer_tag) except KeyError: pass # wat? else: @@ -209,7 +209,7 @@ class ClusterHandlerThread(threading.Thread): A consumer has been cancelled """ try: - queue = self.queues_by_consumer_tags.pop(consumer_tag) + queue, no_ack = self.queues_by_consumer_tags.pop(consumer_tag) except KeyError: return # what? diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 53cbb65bfab3e60b440ca32f901aca6a731f0504..ddcaa19052d65060937a188f58577d52139bdb02 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -93,6 +93,9 @@ class DeclareQueue(_Queue): class ConsumeQueue(_Queue): """Declare and consume from a queue""" + def __init__(self, queue, no_ack=False, on_completed=None, on_failed=None): + _Queue.__init__(self, queue, on_completed=on_completed, on_failed=on_failed) + self.no_ack = no_ack class DeleteQueue(_Queue): """Delete a queue""" diff --git a/setup.py b/setup.py index c87da9d80beda730f2511a32880b5ab70c2b557d..699839ae4a6fb218c4a82efc302bd33695ea4c76 100644 --- a/setup.py +++ b/setup.py @@ -3,12 +3,12 @@ from setuptools import setup setup(name='CoolAMQP', - version='0.10', + version='0.11', description='AMQP client with sane reconnects', author='DMS Serwis s.c.', author_email='piotrm@smok.co', url='https://github.com/smok-serwis/coolamqp', - download_url='https://github.com/smok-serwis/coolamqp/archive/v0.10.zip', + download_url='https://github.com/smok-serwis/coolamqp/archive/v0.11.zip', keywords=['amqp', 'pyamqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], packages=[ 'coolamqp', diff --git a/tests/test_failures.py b/tests/test_failures.py index 5eb6a96401d0ce672bf2032f6131155873bbd34d..2349fc922da395f0b683318e2f89a26dc4d4baf6 100644 --- a/tests/test_failures.py +++ b/tests/test_failures.py @@ -93,6 +93,33 @@ class TestFailures(unittest.TestCase): self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp) self.assertIsNone(self.amqp.drain(wait=6)) # message is NOT received + def test_qos_after_failure(self): + self.amqp.qos(0, 1) + + self.amqp.consume(Queue('lol', exclusive=True)).result() + self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') + self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') + + p = self.amqp.drain(wait=4) + self.assertIsInstance(p, MessageReceived) + + self.assertIsNone(self.amqp.drain(wait=5)) + p.message.ack() + self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) + + os.system("sudo service rabbitmq-server restart") + self.assertIsInstance(self.amqp.drain(wait=6), ConnectionDown) + self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + + self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') + self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') + + p = self.amqp.drain(wait=4) + self.assertIsInstance(p, MessageReceived) + + self.assertIsNone(self.amqp.drain(wait=5)) + p.message.ack() + self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) def test_connection_down_and_up_redeclare_queues(self): """are messages generated at all? does it reconnect?""" diff --git a/tests/test_noack.py b/tests/test_noack.py new file mode 100644 index 0000000000000000000000000000000000000000..440aaf8ae4578f6ea35852e6f6b014fa6d8ce5be --- /dev/null +++ b/tests/test_noack.py @@ -0,0 +1,98 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import unittest +import six +import os + +from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConnectionDown, ConsumerCancelled, Message, Exchange + + + +class TestNoAcknowledge(unittest.TestCase): + def setUp(self): + self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) + self.amqp.start() + self.assertIsInstance(self.amqp.drain(1), ConnectionUp) + + def tearDown(self): + self.amqp.shutdown() + + def test_noack_works(self): + myq = Queue('myqueue', exclusive=True) + + self.amqp.qos(0,1 ) + + self.amqp.consume(myq, no_ack=True) + + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + + self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived) + self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) + self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) + + def test_noack_works_after_restart(self): + myq = Queue('myqueue', exclusive=True) + + self.amqp.qos(0,1 ) + + self.amqp.consume(myq, no_ack=True) + + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + + self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived) + self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) + self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) + + os.system("sudo service rabbitmq-server restart") + self.assertIsInstance(self.amqp.drain(wait=5), ConnectionDown) + self.assertIsInstance(self.amqp.drain(wait=5), ConnectionUp) + + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + + self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived) + self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) + self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) + + + def test_noack_coexists(self): + myq = Queue('myqueue', exclusive=True) + my2 = Queue('myqueue2', exclusive=True) + + self.amqp.qos(0, 1) + + self.amqp.consume(myq, no_ack=True) + self.amqp.consume(my2) + + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') + + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue2') + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue2') + self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue2') + + our_message = None + for i in xrange(0, 4): + mer = self.amqp.drain(wait=1) + self.assertIsInstance(mer, MessageReceived) + if mer.message.routing_key == 'myqueue2': + self.assertIsNone(our_message) + our_message = mer + + # Should receive nothing, since not acked + self.assertIsNone(self.amqp.drain(wait=2)) + + # ack and receive + our_message.message.ack() + mer = self.amqp.drain(wait=1) # 2nd + self.assertIsInstance(mer, MessageReceived) + mer.message.ack() + mer = self.amqp.drain(wait=1) # 3rd + self.assertIsInstance(mer, MessageReceived) + mer.message.ack()