Skip to content
Snippets Groups Projects

CoolAMQP 2.0.0

Merged Piotr Maślanka requested to merge milestone-2.0.0 into develop
Files
21
@@ -84,14 +84,8 @@ class Consumer(Channeler):
:param span: optional span, if opentracing is installed
:param no_ack: Will this consumer require acknowledges from messages?
:type no_ack: bool
:param qos: a tuple of (prefetch size, prefetch window) for this
consumer, or an int (prefetch window only).
If an int is passed, prefetch size will be set to 0 (which means
undefined), and this int will be used for prefetch window
:type qos: tuple(int, int) or tuple(None, int) or int
:param cancel_on_failure: Consumer will cancel itself when link goes
down
:type cancel_on_failure: bool
:param qos: maximum messages to send to client that can stay unacknowledged (qos.prefetch_count)
:type qos: int, prefetch_count to use
:param future_to_notify: Future to succeed when this consumer goes
online for the first time.
This future can also raise with AMQPError if
@@ -113,13 +107,12 @@ class Consumer(Channeler):
__slots__ = ('queue', 'no_ack', 'on_message', 'cancelled', 'receiver',
'attache_group', 'channel_close_sent', 'qos', 'qos_update_sent',
'future_to_notify', 'future_to_notify_on_dead',
'fail_on_first_time_resource_locked', 'cancel_on_failure',
'fail_on_first_time_resource_locked',
'body_receive_mode', 'consumer_tag', 'on_cancel', 'on_broker_cancel',
'hb_watch', 'deliver_watch', 'span')
def __init__(self, queue, on_message, span=None,
no_ack=True, qos=None,
cancel_on_failure=False,
no_ack=True, qos=0,
future_to_notify=None,
fail_on_first_time_resource_locked=False,
body_receive_mode=BodyReceiveMode.BYTES
@@ -144,14 +137,13 @@ class Consumer(Channeler):
self.channel_close_sent = False # for avoiding situations where ChannelClose is sent twice
# if this is not None, then it has an attribute
# on_cancel_customer(Consumer instance)
self.qos = _qosify(qos)
self.qos = qos
self.qos_update_sent = False # QoS was not sent to server
self.future_to_notify = future_to_notify
self.future_to_notify_on_dead = None # .cancel
self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked
self.cancel_on_failure = cancel_on_failure
self.body_receive_mode = body_receive_mode
self.consumer_tag = None
@@ -162,18 +154,16 @@ class Consumer(Channeler):
oneshots=True) #: public, called on Customer Cancel Notification
# (RabbitMQ)
def set_qos(self, prefetch_size, prefetch_count): # type: (int, int) -> None
def set_qos(self, prefetch_count): # type: (int, int) -> None
"""
Set new QoS for this consumer.
:param prefetch_size: prefetch in octets
:param prefetch_count: prefetch in whole messages
:type prefetch_count: int
"""
if prefetch_size:
warnings.warn('RabbitMQ stopped supporting prefetch_sizes, will use 0 anyway', DeprecationWarning)
if self.state == ST_ONLINE:
self.method(BasicQos(0, prefetch_count, False))
self.qos = 0, prefetch_count
self.qos = prefetch_count
def cancel(self): # type: () -> Future
"""
@@ -245,9 +235,7 @@ class Consumer(Channeler):
Note, this can be called multiple times, and eventually with None.
"""
if self.cancel_on_failure and (not self.cancelled):
logger.debug(
'Consumer is cancel_on_failure and failure seen, True->cancelled')
if not self.cancelled:
self.cancelled = True
self.on_cancel()
@@ -412,24 +400,25 @@ class Consumer(Channeler):
queue_declared = False
# We need any form of binding.
if self.queue.exchange is not None:
if self.queue.exchange.type != b'topic':
queue_declared = True
self.method_and_watch(
QueueBind(
self.queue.name,
self.queue.exchange.name.encode('utf8'),
b'', False, []),
QueueBindOk,
self.on_setup
)
queue_declared = True
qb = QueueBind(
self.queue.name,
self.queue.exchange.name.encode('utf-8'),
self.queue.routing_key, False, [])
logger.debug('Running %s' % (repr(qb)))
self.method_and_watch(
qb,
QueueBindOk,
self.on_setup
)
if not queue_declared:
# default exchange, pretend it was bind ok
self.on_setup(QueueBindOk())
elif isinstance(payload, QueueBindOk):
if self.qos is not None:
if self.qos:
self.method_and_watch(
BasicQos(0, self.qos[1], False),
BasicQos(0, self.qos, False),
BasicQosOk,
self.on_setup
)
@@ -474,17 +463,8 @@ class Consumer(Channeler):
return
# resend QoS, in case of sth
if self.qos is not None:
self.set_qos(0, self.qos[1])
def _qosify(qos):
if qos is not None:
if isinstance(qos, int):
qos = 0, qos
elif qos[0] is None:
qos = 0, qos[1] # prefetch_size=0=undefined
return qos
if self.qos:
self.set_qos(self.qos)
class MessageReceiver(object):
Loading