diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index 9cb7636b3d7a0fb975341079085183160bfc536e..7d4a25cb1bc7f83f3bbb849a3f28f0729dea7238 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -99,7 +99,7 @@ class AMQPBackend(object): :param delivery_tag: delivery tag to ack """ - def basic_qos(self, prefetch_size, prefetch_count): + def basic_qos(self, prefetch_size, prefetch_count, global_): """ Issue a basic.qos(prefetch_size, prefetch_count, True) against broker :param prefetch_size: prefetch window size in octets diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index c06abdcebc972f256debed54302ecd0a7bdb0813..26e86861a6cb9bf6af701820bd5fa73f0f550f3e 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -98,8 +98,8 @@ class PyAMQPBackend(AMQPBackend): self.channel.exchange_delete(exchange.name) @translate_exceptions - def basic_qos(self, prefetch_size, prefetch_count): - self.channel.basic_qos(prefetch_size, prefetch_count, True) + def basic_qos(self, prefetch_size, prefetch_count, global_): + self.channel.basic_qos(prefetch_size, prefetch_count, global_) @translate_exceptions def queue_delete(self, queue): diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index ffdd05805e5f7adbe19c4b016e57ef4e4d06a597..5a5fa5a6626b0bf668315abbe95b02121328e94e 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -164,8 +164,8 @@ class Cluster(object): self.thread.order_queue.append(a) return a - def qos(self, prefetch_window, prefetch_count): - a = SetQoS(prefetch_window, prefetch_count) + def qos(self, prefetch_window, prefetch_count, global_=True): + a = SetQoS(prefetch_window, prefetch_count, global_) self.thread.order_queue.append(a) return a diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 79399feff0abf0a97a542ccfcd898864e7825bea..90271bf6f7d312f088d574d82e13f718c2e74347 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -113,9 +113,9 @@ class CancelQueue(_Queue): class SetQoS(Order): """Set QoS""" - def __init__(self, prefetch_window, prefetch_count, on_completed=None, on_failed=None): + def __init__(self, prefetch_window, prefetch_count, global_, on_completed=None, on_failed=None): Order.__init__(self, on_completed=on_completed, on_failed=on_failed) - self.qos = prefetch_window, prefetch_count + self.qos = prefetch_window, prefetch_count, global_ 1 class _AcksAndNacks(Order): diff --git a/tests/test_noack.py b/tests/test_noack.py index faa5925b29ec56f185a5690420d8589a9eda9937..f13448757793d07a0d54c324d4860cad5dec5c3a 100644 --- a/tests/test_noack.py +++ b/tests/test_noack.py @@ -20,7 +20,7 @@ class TestNoAcknowledge(unittest.TestCase): def test_noack_works(self): myq = Queue('myqueue', exclusive=True) - self.amqp.qos(0,1 ) + self.amqp.qos(0, 1, False) self.amqp.consume(myq, no_ack=True) @@ -35,7 +35,7 @@ class TestNoAcknowledge(unittest.TestCase): def test_noack_works_after_restart(self): myq = Queue('myqueue', exclusive=True) - self.amqp.qos(0, 1) + self.amqp.qos(0, 1, False) self.amqp.consume(myq, no_ack=True) @@ -59,23 +59,24 @@ class TestNoAcknowledge(unittest.TestCase): self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) self.assertIsInstance(self.amqp.drain(wait=0.3), MessageReceived) - def test_noack_coexists(self): myq = Queue('myqueue', exclusive=True) my2 = Queue('myqueue2', exclusive=True) - self.amqp.qos(0, 1) + self.amqp.qos(0, 1, False) self.amqp.consume(myq, no_ack=True) - self.amqp.consume(my2).result() + self.amqp.consume(my2) + + msg = Message(b'') - self.amqp.send(Message(b''), routing_key='myqueue') - self.amqp.send(Message(b''), routing_key='myqueue') - self.amqp.send(Message(b''), routing_key='myqueue') + self.amqp.send(msg, routing_key='myqueue') + self.amqp.send(msg, routing_key='myqueue') + self.amqp.send(msg, routing_key='myqueue') - self.amqp.send(Message(b''), routing_key='myqueue2') - self.amqp.send(Message(b''), routing_key='myqueue2') - self.amqp.send(Message(b''), routing_key='myqueue2') + self.amqp.send(msg, routing_key='myqueue2') + self.amqp.send(msg, routing_key='myqueue2') + self.amqp.send(msg, routing_key='myqueue2').result() our_message = None for i in range(4): @@ -84,6 +85,7 @@ class TestNoAcknowledge(unittest.TestCase): if mer.message.routing_key == 'myqueue2': self.assertIsNone(our_message) our_message = mer + self.assertIsNotNone(our_message) # Should receive nothing, since not acked self.assertIsNone(self.amqp.drain(wait=2))