From 161ac731260738a2dbc1cccfb62f45324a88d1b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sun, 25 Dec 2016 01:49:40 +0100 Subject: [PATCH] v0.11 RC, submit for tests --- coolamqp/backends/base.py | 2 +- coolamqp/backends/pyamqp.py | 4 ++-- coolamqp/cluster.py | 4 ++-- coolamqp/orders.py | 4 ++-- tests/test_noack.py | 24 +++++++++++++----------- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index 9cb7636..7d4a25c 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -99,7 +99,7 @@ class AMQPBackend(object): :param delivery_tag: delivery tag to ack """ - def basic_qos(self, prefetch_size, prefetch_count): + def basic_qos(self, prefetch_size, prefetch_count, global_): """ Issue a basic.qos(prefetch_size, prefetch_count, True) against broker :param prefetch_size: prefetch window size in octets diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index c06abdc..26e8686 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -98,8 +98,8 @@ class PyAMQPBackend(AMQPBackend): self.channel.exchange_delete(exchange.name) @translate_exceptions - def basic_qos(self, prefetch_size, prefetch_count): - self.channel.basic_qos(prefetch_size, prefetch_count, True) + def basic_qos(self, prefetch_size, prefetch_count, global_): + self.channel.basic_qos(prefetch_size, prefetch_count, global_) @translate_exceptions def queue_delete(self, queue): diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index ffdd058..5a5fa5a 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -164,8 +164,8 @@ class Cluster(object): self.thread.order_queue.append(a) return a - def qos(self, prefetch_window, prefetch_count): - a = SetQoS(prefetch_window, prefetch_count) + def qos(self, prefetch_window, prefetch_count, global_=True): + a = SetQoS(prefetch_window, prefetch_count, global_) self.thread.order_queue.append(a) return a diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 79399fe..90271bf 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -113,9 +113,9 @@ class CancelQueue(_Queue): class SetQoS(Order): """Set QoS""" - def __init__(self, prefetch_window, prefetch_count, on_completed=None, on_failed=None): + def __init__(self, prefetch_window, prefetch_count, global_, on_completed=None, on_failed=None): Order.__init__(self, on_completed=on_completed, on_failed=on_failed) - self.qos = prefetch_window, prefetch_count + self.qos = prefetch_window, prefetch_count, global_ 1 class _AcksAndNacks(Order): diff --git a/tests/test_noack.py b/tests/test_noack.py index faa5925..f134487 100644 --- a/tests/test_noack.py +++ b/tests/test_noack.py @@ -20,7 +20,7 @@ class TestNoAcknowledge(unittest.TestCase): def test_noack_works(self): myq = Queue('myqueue', exclusive=True) - self.amqp.qos(0,1 ) + self.amqp.qos(0, 1, False) self.amqp.consume(myq, no_ack=True) @@ -35,7 +35,7 @@ class TestNoAcknowledge(unittest.TestCase): def test_noack_works_after_restart(self): myq = Queue('myqueue', exclusive=True) - self.amqp.qos(0, 1) + self.amqp.qos(0, 1, False) self.amqp.consume(myq, no_ack=True) @@ -59,23 +59,24 @@ class TestNoAcknowledge(unittest.TestCase): self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) - def test_noack_coexists(self): myq = Queue('myqueue', exclusive=True) my2 = Queue('myqueue2', exclusive=True) - self.amqp.qos(0, 1) + self.amqp.qos(0, 1, False) self.amqp.consume(myq, no_ack=True) - self.amqp.consume(my2).result() + self.amqp.consume(my2) + + msg = Message(b'') - self.amqp.send(Message(b''), routing_key='myqueue') - self.amqp.send(Message(b''), routing_key='myqueue') - self.amqp.send(Message(b''), routing_key='myqueue') + self.amqp.send(msg, routing_key='myqueue') + self.amqp.send(msg, routing_key='myqueue') + self.amqp.send(msg, routing_key='myqueue') - self.amqp.send(Message(b''), routing_key='myqueue2') - self.amqp.send(Message(b''), routing_key='myqueue2') - self.amqp.send(Message(b''), routing_key='myqueue2') + self.amqp.send(msg, routing_key='myqueue2') + self.amqp.send(msg, routing_key='myqueue2') + self.amqp.send(msg, routing_key='myqueue2').result() our_message = None for i in range(4): @@ -84,6 +85,7 @@ class TestNoAcknowledge(unittest.TestCase): if mer.message.routing_key == 'myqueue2': self.assertIsNone(our_message) our_message = mer + self.assertIsNotNone(our_message) # Should receive nothing, since not acked self.assertIsNone(self.amqp.drain(wait=2)) -- GitLab