diff --git a/CHANGELOG.md b/CHANGELOG.md index ae19965b35c635a5efc58ac07e09d912795f147b..1b1eb89c80c2c2f7c1dd47854b8872c72d5e7821 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ v2.0.0 * changes to Cluster: * declare will refuse to declare an anonymous queue * renamed publish(tx) to publish(confirm) + * declare will expect qos to be given as an integer, and will be set as prefetch_count, since RabbitMQ no longer + supports prefetch_size v1.5.0 ====== diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 7ef4bfb6dcd5c1d13153768c1a3081522ab7fdf1..0d6b713b1ca3d0c6c972d6eb2348da0fde32d4b3 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1 @@ -__version__ = '2.0.0a2' +__version__ = '2.0.0a3' diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index bd43bf3356005f1f9460cb3042cfbe93b69c099e..723998e2d223eb037b902dd62136c9510e01a90d 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -88,7 +88,7 @@ class Consumer(Channeler): consumer, or an int (prefetch window only). If an int is passed, prefetch size will be set to 0 (which means undefined), and this int will be used for prefetch window - :type qos: tuple(int, int) or tuple(None, int) or int + :type qos: int, prefetch_count to use :param cancel_on_failure: Consumer will cancel itself when link goes down :type cancel_on_failure: bool @@ -118,7 +118,7 @@ class Consumer(Channeler): 'hb_watch', 'deliver_watch', 'span') def __init__(self, queue, on_message, span=None, - no_ack=True, qos=None, + no_ack=True, qos=0, cancel_on_failure=False, future_to_notify=None, fail_on_first_time_resource_locked=False, @@ -144,7 +144,7 @@ class Consumer(Channeler): self.channel_close_sent = False # for avoiding situations where ChannelClose is sent twice # if this is not None, then it has an attribute # on_cancel_customer(Consumer instance) - self.qos = _qosify(qos) + self.qos = qos self.qos_update_sent = False # QoS was not sent to server self.future_to_notify = future_to_notify @@ -162,18 +162,16 @@ class Consumer(Channeler): oneshots=True) #: public, called on Customer Cancel Notification # (RabbitMQ) - def set_qos(self, prefetch_size, prefetch_count): # type: (int, int) -> None + def set_qos(self, prefetch_count): # type: (int, int) -> None """ Set new QoS for this consumer. - :param prefetch_size: prefetch in octets :param prefetch_count: prefetch in whole messages + :type prefetch_count: int """ - if prefetch_size: - warnings.warn('RabbitMQ stopped supporting prefetch_sizes, will use 0 anyway', DeprecationWarning) if self.state == ST_ONLINE: self.method(BasicQos(0, prefetch_count, False)) - self.qos = 0, prefetch_count + self.qos = prefetch_count def cancel(self): # type: () -> Future """ @@ -427,9 +425,9 @@ class Consumer(Channeler): # default exchange, pretend it was bind ok self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): - if self.qos is not None: + if self.qos: self.method_and_watch( - BasicQos(0, self.qos[1], False), + BasicQos(0, self.qos, False), BasicQosOk, self.on_setup ) @@ -474,17 +472,8 @@ class Consumer(Channeler): return # resend QoS, in case of sth - if self.qos is not None: - self.set_qos(0, self.qos[1]) - - -def _qosify(qos): - if qos is not None: - if isinstance(qos, int): - qos = 0, qos - elif qos[0] is None: - qos = 0, qos[1] # prefetch_size=0=undefined - return qos + if self.qos: + self.set_qos(self.qos) class MessageReceiver(object): diff --git a/tests/test_attaches/test_consumer.py b/tests/test_attaches/test_consumer.py index 8173a1cec6edf2ab2bb5d07d97770f37cde45a0d..25ac677745cd5bd433d90316347f935f2d1eeb81 100644 --- a/tests/test_attaches/test_consumer.py +++ b/tests/test_attaches/test_consumer.py @@ -11,4 +11,4 @@ class TestConsumer(unittest.TestCase): def test_issue_26(self): """Support for passing qos as int""" cons = Consumer(Queue('wtf'), lambda msg: None, qos=25) - self.assertEqual(cons.qos, (0, 25)) + self.assertEqual(cons.qos, 25) diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 1012f3a1d5590da3f2187dd0f2478f8d4b8a3858..b4588db299cf0b316b6b3df664f9791fb79b58e7 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -75,13 +75,13 @@ class TestA(unittest.TestCase): fut.result() - con.set_qos(0, 100) + con.set_qos(100) time.sleep(1) - self.assertEqual(con.qos, (0, 100)) + self.assertEqual(con.qos, 100) - con.set_qos(None, 110) + con.set_qos(110) time.sleep(1) - self.assertEqual(con.qos, (0, 110)) + self.assertEqual(con.qos, 110) def test_declare_anonymous(self): xchg = Exchange('wtfzomg', type='fanout') diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py index a723c861d943047433f84c0fdd1cfedb08fa1a69..b9afbbcb33243c392e9a610cddd7c46c47d84005 100644 --- a/tests/test_clustering/test_double.py +++ b/tests/test_clustering/test_double.py @@ -54,7 +54,7 @@ class TestDouble(unittest.TestCase): q = Queue(u'yo', exclusive=True, auto_delete=True) - con, fut = self.c1.consume(q, qos=(None, 20)) + con, fut = self.c1.consume(q, qos=20) fut.result() try: