Skip to content
Snippets Groups Projects

Issue #15

Merged Piotr Maślanka requested to merge issue-#15 into develop
14 files
+ 304
30
Compare changes
  • Side-by-side
  • Inline
Files
14
@@ -18,6 +18,7 @@ from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \
BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, BasicQosOk
from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame
from coolamqp.objects import Callable
from coolamqp.argumentify import argumentify
from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch
logger = logging.getLogger(__name__)
@@ -59,7 +60,7 @@ class Consumer(Channeler):
go.
WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS
DO!
DO!
You can subscribe to be informed when the consumer is cancelled (for any
reason, server or client side) with:
@@ -98,23 +99,30 @@ class Consumer(Channeler):
If the consumer doesn't get the chance to be declared - because
of a connection fail - next reconnect will consider this to be
SECOND declaration, ie. it will retry ad infinitum
.. deprecated:: v2.0.1
Use normal reconnects, for fuck's sake!
:type fail_on_first_time_resource_locked: bool
:param body_receive_mode: how should message.body be received. This
has a performance impact
:type body_receive_mode: a property of BodyReceiveMode
:type body_receive_mode: a property of :classs:`BodyReceiveMode`
:param arguments: a dictionary, extra set of arguments to be provided to RabbitMQ during binding.
Primarily to support streams.
"""
__slots__ = ('queue', 'no_ack', 'on_message', 'cancelled', 'receiver',
'attache_group', 'channel_close_sent', 'qos', 'qos_update_sent',
'future_to_notify', 'future_to_notify_on_dead',
'fail_on_first_time_resource_locked',
'body_receive_mode', 'consumer_tag', 'on_cancel', 'on_broker_cancel',
'hb_watch', 'deliver_watch', 'span')
'hb_watch', 'deliver_watch', 'span', 'arguments')
def __init__(self, queue, on_message, span=None,
no_ack=True, qos=0,
future_to_notify=None,
fail_on_first_time_resource_locked=False,
body_receive_mode=BodyReceiveMode.BYTES
body_receive_mode=BodyReceiveMode.BYTES,
arguments=None
):
"""
Note that if you specify QoS, it is applied before basic.consume is
@@ -125,8 +133,12 @@ class Consumer(Channeler):
self.span = span
self.queue = queue
self.arguments = argumentify(arguments)
self.no_ack = no_ack
if fail_on_first_time_resource_locked:
warnings.warn('This is heavily deprecated and discouraged', DeprecationWarning)
self.on_message = on_message
# consumer?
@@ -428,7 +440,7 @@ class Consumer(Channeler):
self.method_and_watch(
BasicConsume(self.queue.name, self.consumer_tag,
False, self.no_ack, self.queue.exclusive, False,
[]),
self.arguments),
BasicConsumeOk,
self.on_setup
)
Loading