Skip to content
Snippets Groups Projects
Commit 4ac48845 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

Merge branch 'issue-#7' into 'milestone-2.0.0'

Fixes #7

See merge request !5
parents 7feb9f43 344d577b
No related branches found
No related tags found
2 merge requests!6CoolAMQP 2.0.0,!5Fixes #7
Pipeline #63434 passed with stages
in 2 minutes and 16 seconds
......@@ -14,6 +14,8 @@ v2.0.0
* changes to Cluster:
* declare will refuse to declare an anonymous queue
* renamed publish(tx) to publish(confirm)
* declare will expect qos to be given as an integer, and will be set as prefetch_count, since RabbitMQ no longer
supports prefetch_size
v1.5.0
======
......
__version__ = '2.0.0a2'
__version__ = '2.0.0a3'
......@@ -88,7 +88,7 @@ class Consumer(Channeler):
consumer, or an int (prefetch window only).
If an int is passed, prefetch size will be set to 0 (which means
undefined), and this int will be used for prefetch window
:type qos: tuple(int, int) or tuple(None, int) or int
:type qos: int, prefetch_count to use
:param cancel_on_failure: Consumer will cancel itself when link goes
down
:type cancel_on_failure: bool
......@@ -118,7 +118,7 @@ class Consumer(Channeler):
'hb_watch', 'deliver_watch', 'span')
def __init__(self, queue, on_message, span=None,
no_ack=True, qos=None,
no_ack=True, qos=0,
cancel_on_failure=False,
future_to_notify=None,
fail_on_first_time_resource_locked=False,
......@@ -144,7 +144,7 @@ class Consumer(Channeler):
self.channel_close_sent = False # for avoiding situations where ChannelClose is sent twice
# if this is not None, then it has an attribute
# on_cancel_customer(Consumer instance)
self.qos = _qosify(qos)
self.qos = qos
self.qos_update_sent = False # QoS was not sent to server
self.future_to_notify = future_to_notify
......@@ -162,18 +162,16 @@ class Consumer(Channeler):
oneshots=True) #: public, called on Customer Cancel Notification
# (RabbitMQ)
def set_qos(self, prefetch_size, prefetch_count): # type: (int, int) -> None
def set_qos(self, prefetch_count): # type: (int, int) -> None
"""
Set new QoS for this consumer.
:param prefetch_size: prefetch in octets
:param prefetch_count: prefetch in whole messages
:type prefetch_count: int
"""
if prefetch_size:
warnings.warn('RabbitMQ stopped supporting prefetch_sizes, will use 0 anyway', DeprecationWarning)
if self.state == ST_ONLINE:
self.method(BasicQos(0, prefetch_count, False))
self.qos = 0, prefetch_count
self.qos = prefetch_count
def cancel(self): # type: () -> Future
"""
......@@ -427,9 +425,9 @@ class Consumer(Channeler):
# default exchange, pretend it was bind ok
self.on_setup(QueueBindOk())
elif isinstance(payload, QueueBindOk):
if self.qos is not None:
if self.qos:
self.method_and_watch(
BasicQos(0, self.qos[1], False),
BasicQos(0, self.qos, False),
BasicQosOk,
self.on_setup
)
......@@ -474,17 +472,8 @@ class Consumer(Channeler):
return
# resend QoS, in case of sth
if self.qos is not None:
self.set_qos(0, self.qos[1])
def _qosify(qos):
if qos is not None:
if isinstance(qos, int):
qos = 0, qos
elif qos[0] is None:
qos = 0, qos[1] # prefetch_size=0=undefined
return qos
if self.qos:
self.set_qos(self.qos)
class MessageReceiver(object):
......
......@@ -11,4 +11,4 @@ class TestConsumer(unittest.TestCase):
def test_issue_26(self):
"""Support for passing qos as int"""
cons = Consumer(Queue('wtf'), lambda msg: None, qos=25)
self.assertEqual(cons.qos, (0, 25))
self.assertEqual(cons.qos, 25)
......@@ -75,13 +75,13 @@ class TestA(unittest.TestCase):
fut.result()
con.set_qos(0, 100)
con.set_qos(100)
time.sleep(1)
self.assertEqual(con.qos, (0, 100))
self.assertEqual(con.qos, 100)
con.set_qos(None, 110)
con.set_qos(110)
time.sleep(1)
self.assertEqual(con.qos, (0, 110))
self.assertEqual(con.qos, 110)
def test_declare_anonymous(self):
xchg = Exchange('wtfzomg', type='fanout')
......
......@@ -54,7 +54,7 @@ class TestDouble(unittest.TestCase):
q = Queue(u'yo', exclusive=True, auto_delete=True)
con, fut = self.c1.consume(q, qos=(None, 20))
con, fut = self.c1.consume(q, qos=20)
fut.result()
try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment