Skip to content
Snippets Groups Projects

Fixes #7

Merged Piotr Maślanka requested to merge issue-#7 into milestone-2.0.0
6 files
+ 19
28
Compare changes
  • Side-by-side
  • Inline
Files
6
@@ -88,7 +88,7 @@ class Consumer(Channeler):
consumer, or an int (prefetch window only).
If an int is passed, prefetch size will be set to 0 (which means
undefined), and this int will be used for prefetch window
:type qos: tuple(int, int) or tuple(None, int) or int
:type qos: int, prefetch_count to use
:param cancel_on_failure: Consumer will cancel itself when link goes
down
:type cancel_on_failure: bool
@@ -118,7 +118,7 @@ class Consumer(Channeler):
'hb_watch', 'deliver_watch', 'span')
def __init__(self, queue, on_message, span=None,
no_ack=True, qos=None,
no_ack=True, qos=0,
cancel_on_failure=False,
future_to_notify=None,
fail_on_first_time_resource_locked=False,
@@ -144,7 +144,7 @@ class Consumer(Channeler):
self.channel_close_sent = False # for avoiding situations where ChannelClose is sent twice
# if this is not None, then it has an attribute
# on_cancel_customer(Consumer instance)
self.qos = _qosify(qos)
self.qos = qos
self.qos_update_sent = False # QoS was not sent to server
self.future_to_notify = future_to_notify
@@ -162,18 +162,16 @@ class Consumer(Channeler):
oneshots=True) #: public, called on Customer Cancel Notification
# (RabbitMQ)
def set_qos(self, prefetch_size, prefetch_count): # type: (int, int) -> None
def set_qos(self, prefetch_count): # type: (int, int) -> None
"""
Set new QoS for this consumer.
:param prefetch_size: prefetch in octets
:param prefetch_count: prefetch in whole messages
:type prefetch_count: int
"""
if prefetch_size:
warnings.warn('RabbitMQ stopped supporting prefetch_sizes, will use 0 anyway', DeprecationWarning)
if self.state == ST_ONLINE:
self.method(BasicQos(0, prefetch_count, False))
self.qos = 0, prefetch_count
self.qos = prefetch_count
def cancel(self): # type: () -> Future
"""
@@ -427,9 +425,9 @@ class Consumer(Channeler):
# default exchange, pretend it was bind ok
self.on_setup(QueueBindOk())
elif isinstance(payload, QueueBindOk):
if self.qos is not None:
if self.qos:
self.method_and_watch(
BasicQos(0, self.qos[1], False),
BasicQos(0, self.qos, False),
BasicQosOk,
self.on_setup
)
@@ -474,17 +472,8 @@ class Consumer(Channeler):
return
# resend QoS, in case of sth
if self.qos is not None:
self.set_qos(0, self.qos[1])
def _qosify(qos):
if qos is not None:
if isinstance(qos, int):
qos = 0, qos
elif qos[0] is None:
qos = 0, qos[1] # prefetch_size=0=undefined
return qos
if self.qos:
self.set_qos(self.qos)
class MessageReceiver(object):
Loading