From 19a2c1e924894fe574c55e85a92369740a24135c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl>
Date: Sun, 25 Dec 2016 00:02:30 +0100
Subject: [PATCH] fixed #6

---
 coolamqp/backends/base.py   |  3 +-
 coolamqp/backends/pyamqp.py |  3 +-
 coolamqp/cluster.py         |  5 +-
 coolamqp/handler.py         | 14 +++---
 coolamqp/orders.py          |  3 ++
 setup.py                    |  4 +-
 tests/test_failures.py      | 27 ++++++++++
 tests/test_noack.py         | 98 +++++++++++++++++++++++++++++++++++++
 8 files changed, 144 insertions(+), 13 deletions(-)
 create mode 100644 tests/test_noack.py

diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py
index ff0f021..9cb7636 100644
--- a/coolamqp/backends/base.py
+++ b/coolamqp/backends/base.py
@@ -86,10 +86,11 @@ class AMQPBackend(object):
         :param consumer_tag: consumer_tag to cancel
         """
 
-    def basic_consume(self, queue):
+    def basic_consume(self, queue, no_ack=False):
         """
         Start consuming from a queue
         :param queue: Queue object
+        :param no_ack: Messages will not need to be ack()ed for this queue
         """
 
     def basic_ack(self, delivery_tag):
diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py
index 4df7edc..c06abdc 100644
--- a/coolamqp/backends/pyamqp.py
+++ b/coolamqp/backends/pyamqp.py
@@ -128,7 +128,7 @@ class PyAMQPBackend(AMQPBackend):
             queue.name = qname
 
     @translate_exceptions
-    def basic_consume(self, queue):
+    def basic_consume(self, queue, no_ack=False):
         """
         Start consuming from a queue
         :param queue: Queue object
@@ -136,6 +136,7 @@ class PyAMQPBackend(AMQPBackend):
         self.channel.basic_consume(queue.name,
                                    consumer_tag=queue.consumer_tag,
                                    exclusive=queue.exclusive,
+                                   no_ack=no_ack,
                                    callback=self.__on_message,
                                    on_cancel=self.__on_consumercancelled)
 
diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py
index 5d5bb5c..ffdd058 100644
--- a/coolamqp/cluster.py
+++ b/coolamqp/cluster.py
@@ -169,7 +169,7 @@ class Cluster(object):
         self.thread.order_queue.append(a)
         return a
 
-    def consume(self, queue, on_completed=None, on_failed=None):
+    def consume(self, queue, no_ack=False, on_completed=None, on_failed=None):
         """
         Start consuming from a queue
 
@@ -178,10 +178,11 @@ class Cluster(object):
 
         :param queue: Queue to consume from
         :param on_completed: callable/0 to call when this succeeds
+        :param no_ack: if True, you will not need to call .ack() for this queue
         :param on_failed: callable/1 to call when this fails with AMQPError instance
         :return: a Future with this order's status
         """
-        a = ConsumeQueue(queue, on_completed=on_completed, on_failed=on_failed)
+        a = ConsumeQueue(queue, no_ack=no_ack, on_completed=on_completed, on_failed=on_failed)
         self.thread.order_queue.append(a)
         return a
 
diff --git a/coolamqp/handler.py b/coolamqp/handler.py
index f823639..8ac0041 100644
--- a/coolamqp/handler.py
+++ b/coolamqp/handler.py
@@ -39,7 +39,7 @@ class ClusterHandlerThread(threading.Thread):
         self.connect_id = -1                # connectID of current connection
 
         self.declared_exchanges = {}        # declared exchanges, by their names
-        self.queues_by_consumer_tags = {}   # subbed queues, by their consumer tags
+        self.queues_by_consumer_tags = {}   # tuple of (subbed queue, no_ack::bool), by consumer tags
 
         self.backend = None
         self.first_connect = True
@@ -67,11 +67,11 @@ class ClusterHandlerThread(threading.Thread):
                 for exchange in self.declared_exchanges.values():
                     self.backend.exchange_declare(exchange)
 
-                for queue in self.queues_by_consumer_tags.values():
+                for queue, no_ack in self.queues_by_consumer_tags.values():
                     self.backend.queue_declare(queue)
                     if queue.exchange is not None:
                         self.backend.queue_bind(queue, queue.exchange)
-                    self.backend.basic_consume(queue)
+                    self.backend.basic_consume(queue, no_ack=no_ack)
 
             except ConnectionFailedError as e:
                 # a connection failure happened :(
@@ -126,11 +126,11 @@ class ClusterHandlerThread(threading.Thread):
                 if order.queue.exchange is not None:
                     self.backend.queue_bind(order.queue, order.queue.exchange)
 
-                self.backend.basic_consume(order.queue)
-                self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue
+                self.backend.basic_consume(order.queue, no_ack=order.no_ack)
+                self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue, order.no_ack
             elif isinstance(order, CancelQueue):
                 try:
-                    q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag)
+                    q, no_ack = self.queues_by_consumer_tags.pop(order.queue.consumer_tag)
                 except KeyError:
                     pass  # wat?
                 else:
@@ -209,7 +209,7 @@ class ClusterHandlerThread(threading.Thread):
         A consumer has been cancelled
         """
         try:
-            queue = self.queues_by_consumer_tags.pop(consumer_tag)
+            queue, no_ack = self.queues_by_consumer_tags.pop(consumer_tag)
         except KeyError:
             return  # what?
 
diff --git a/coolamqp/orders.py b/coolamqp/orders.py
index 53cbb65..ddcaa19 100644
--- a/coolamqp/orders.py
+++ b/coolamqp/orders.py
@@ -93,6 +93,9 @@ class DeclareQueue(_Queue):
 
 class ConsumeQueue(_Queue):
     """Declare and consume from a queue"""
+    def __init__(self, queue, no_ack=False, on_completed=None, on_failed=None):
+        _Queue.__init__(self, queue, on_completed=on_completed, on_failed=on_failed)
+        self.no_ack = no_ack
 
 class DeleteQueue(_Queue):
     """Delete a queue"""
diff --git a/setup.py b/setup.py
index c87da9d..699839a 100644
--- a/setup.py
+++ b/setup.py
@@ -3,12 +3,12 @@
 from setuptools import setup
 
 setup(name='CoolAMQP',
-      version='0.10',
+      version='0.11',
       description='AMQP client with sane reconnects',
       author='DMS Serwis s.c.',
       author_email='piotrm@smok.co',
       url='https://github.com/smok-serwis/coolamqp',
-      download_url='https://github.com/smok-serwis/coolamqp/archive/v0.10.zip',
+      download_url='https://github.com/smok-serwis/coolamqp/archive/v0.11.zip',
       keywords=['amqp', 'pyamqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'],
       packages=[
           'coolamqp',
diff --git a/tests/test_failures.py b/tests/test_failures.py
index 5eb6a96..2349fc9 100644
--- a/tests/test_failures.py
+++ b/tests/test_failures.py
@@ -93,6 +93,33 @@ class TestFailures(unittest.TestCase):
         self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp)
         self.assertIsNone(self.amqp.drain(wait=6))    # message is NOT received
 
+    def test_qos_after_failure(self):
+        self.amqp.qos(0, 1)
+
+        self.amqp.consume(Queue('lol', exclusive=True)).result()
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='lol')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='lol')
+
+        p = self.amqp.drain(wait=4)
+        self.assertIsInstance(p, MessageReceived)
+
+        self.assertIsNone(self.amqp.drain(wait=5))
+        p.message.ack()
+        self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)
+
+        os.system("sudo service rabbitmq-server restart")
+        self.assertIsInstance(self.amqp.drain(wait=6), ConnectionDown)
+        self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp)
+
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='lol')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='lol')
+
+        p = self.amqp.drain(wait=4)
+        self.assertIsInstance(p, MessageReceived)
+
+        self.assertIsNone(self.amqp.drain(wait=5))
+        p.message.ack()
+        self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)
 
     def test_connection_down_and_up_redeclare_queues(self):
         """are messages generated at all? does it reconnect?"""
diff --git a/tests/test_noack.py b/tests/test_noack.py
new file mode 100644
index 0000000..440aaf8
--- /dev/null
+++ b/tests/test_noack.py
@@ -0,0 +1,98 @@
+# coding=UTF-8
+from __future__ import absolute_import, division, print_function
+import unittest
+import six
+import os
+
+from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConnectionDown, ConsumerCancelled, Message, Exchange
+
+
+
+class TestNoAcknowledge(unittest.TestCase):
+    def setUp(self):
+        self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')])
+        self.amqp.start()
+        self.assertIsInstance(self.amqp.drain(1), ConnectionUp)
+
+    def tearDown(self):
+        self.amqp.shutdown()
+
+    def test_noack_works(self):
+        myq = Queue('myqueue', exclusive=True)
+
+        self.amqp.qos(0,1 )
+
+        self.amqp.consume(myq, no_ack=True)
+
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+
+        self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived)
+        self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived)
+        self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived)
+
+    def test_noack_works_after_restart(self):
+        myq = Queue('myqueue', exclusive=True)
+
+        self.amqp.qos(0,1 )
+
+        self.amqp.consume(myq, no_ack=True)
+
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+
+        self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived)
+        self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived)
+        self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived)
+
+        os.system("sudo service rabbitmq-server restart")
+        self.assertIsInstance(self.amqp.drain(wait=5), ConnectionDown)
+        self.assertIsInstance(self.amqp.drain(wait=5), ConnectionUp)
+
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+
+        self.assertIsInstance(self.amqp.drain(wait=1), MessageReceived)
+        self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived)
+        self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived)
+
+
+    def test_noack_coexists(self):
+        myq = Queue('myqueue', exclusive=True)
+        my2 = Queue('myqueue2', exclusive=True)
+
+        self.amqp.qos(0, 1)
+
+        self.amqp.consume(myq, no_ack=True)
+        self.amqp.consume(my2)
+
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
+
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue2')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue2')
+        self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue2')
+
+        our_message = None
+        for i in xrange(0, 4):
+            mer = self.amqp.drain(wait=1)
+            self.assertIsInstance(mer, MessageReceived)
+            if mer.message.routing_key == 'myqueue2':
+                self.assertIsNone(our_message)
+                our_message = mer
+
+        # Should receive nothing, since not acked
+        self.assertIsNone(self.amqp.drain(wait=2))
+
+        # ack and receive
+        our_message.message.ack()
+        mer = self.amqp.drain(wait=1)       # 2nd
+        self.assertIsInstance(mer, MessageReceived)
+        mer.message.ack()
+        mer = self.amqp.drain(wait=1)       # 3rd
+        self.assertIsInstance(mer, MessageReceived)
+        mer.message.ack()
-- 
GitLab