diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index d908b12aa091a95f3916a1977245269216655ad4..e62f99c9b6357b23ce179ab150f6b22f3003daf1 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -28,16 +28,17 @@ class BodyReceiveMode(object): # C - copy (copies every byte once) BYTES = 0 # message.body will be a single bytes object - # this will gather frames as memoryviews, and b''.join() them upon receiving last frame + # this will gather frames as memoryviews, and b''.join() them upon + # receiving last frame # this is C MEMORYVIEW = 1 # message.body will be returned as a memoryview object # this is ZC for small messages, and C for multi-frame ones - # think less than 800B, since 2048 is the buffer for socket recv, and an AMQP - # frame (or many frames!) have to fit there + # think less than 800B, since 2048 is the buffer for socket recv, and an + # AMQP frame (or many frames!) have to fit there - LIST_OF_MEMORYVIEW = 2 # message.body will be returned as list of memoryview objects - # these constitute received pieces. this is always ZC + LIST_OF_MEMORYVIEW = 2 # message.body will be returned as list of + # memoryview objects these constitute received pieces. this is always ZC class Consumer(Channeler): @@ -52,14 +53,16 @@ class Consumer(Channeler): on_start will be called. This means that broker has confirmed that this consumer is operational and receiving messages. - Note that does not attempt to cancel consumers, or any of such nonsense. Having - a channel per consumer gives you the unique possibility of simply closing the channel. - Since this implies cancelling the consumer, here you go. + Note that does not attempt to cancel consumers, or any of such nonsense. + Having a channel per consumer gives you the unique possibility of simply + closing the channel. Since this implies cancelling the consumer, here you + go. - WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS DO! + WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS + DO! - You can subscribe to be informed when the consumer is cancelled (for any reason, - server or client side) with: + You can subscribe to be informed when the consumer is cancelled (for any + reason, server or client side) with: con, fut = Cluster.consume(...) @@ -69,7 +72,8 @@ class Consumer(Channeler): con.on_cancel.add(im_called_on_cancel_for_any_reason) con.cancel() - Or, if RabbitMQ is in use, you can be informed upon a Consumer Cancel Notification: + Or, if RabbitMQ is in use, you can be informed upon a Consumer Cancel + Notification: con.on_broker_cancel.add(im_cancelled_by_broker) @@ -82,31 +86,38 @@ class Consumer(Channeler): body_receive_mode=BodyReceiveMode.BYTES ): """ - Note that if you specify QoS, it is applied before basic.consume is sent. This will prevent - the broker from hammering you into oblivion with a mountain of messages. + Note that if you specify QoS, it is applied before basic.consume is + sent. This will prevent the broker from hammering you into oblivion + with a mountain of messages. :param queue: Queue object, being consumed from right now. Note that name of anonymous queue might change at any time! :param on_message: callable that will process incoming messages :type on_message: callable(ReceivedMessage instance) :param no_ack: Will this consumer require acknowledges from messages? - :param qos: a tuple of (prefetch size, prefetch window) for this consumer, or an int (prefetch window only) - If an int is passed, prefetch size will be set to 0 (which means undefined), and this int - will be used for prefetch window + :param qos: a tuple of (prefetch size, prefetch window) for this + consumer, or an int (prefetch window only). + If an int is passed, prefetch size will be set to 0 (which means + undefined), and this int will be used for prefetch window :type qos: tuple(int, int) or tuple(None, int) or int - :param cancel_on_failure: Consumer will cancel itself when link goes down + :param cancel_on_failure: Consumer will cancel itself when link goes + down :type cancel_on_failure: bool - :param future_to_notify: Future to succeed when this consumer goes online for the first time. - This future can also raise with AMQPError if it fails to. - :param fail_on_first_time_resource_locked: When consumer is declared for the first time, - and RESOURCE_LOCKED is encountered, it will fail the - future with ResourceLocked, and consumer will cancel itself. - By default it will retry until success is made. - If the consumer doesn't get the chance to be declared - because - of a connection fail - next reconnect will consider this to be - SECOND declaration, ie. it will retry ad infinitum + :param future_to_notify: Future to succeed when this consumer goes + online for the first time. + This future can also raise with AMQPError if + it fails to. + :param fail_on_first_time_resource_locked: When consumer is declared + for the first time, and RESOURCE_LOCKED is encountered, it will + fail the future with ResourceLocked, and consumer will cancel + itself. + By default it will retry until success is made. + If the consumer doesn't get the chance to be declared - because + of a connection fail - next reconnect will consider this to be + SECOND declaration, ie. it will retry ad infinitum :type fail_on_first_time_resource_locked: bool - :param body_receive_mode: how should message.body be received. This has a performance impact + :param body_receive_mode: how should message.body be received. This + has a performance impact :type body_receive_mode: BodyReceiveMode.* """ super(Consumer, self).__init__() @@ -117,7 +128,8 @@ class Consumer(Channeler): self.on_message = on_message # private - self.cancelled = False # did the client want to STOP using this consumer? + self.cancelled = False # did the client want to STOP using this + # consumer? self.receiver = None # MessageReceiver instance self.attache_group = None # attache group this belongs to. @@ -129,7 +141,8 @@ class Consumer(Channeler): self.future_to_notify = future_to_notify self.future_to_notify_on_dead = None # .cancel - self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked + self.fail_on_first_time_resource_locked = \ + fail_on_first_time_resource_locked self.cancel_on_failure = cancel_on_failure self.body_receive_mode = body_receive_mode @@ -138,7 +151,8 @@ class Consumer(Channeler): self.on_cancel = Callable( oneshots=True) #: public, called on cancel for any reason self.on_broker_cancel = Callable( - oneshots=True) #: public, called on Customer Cancel Notification (RabbitMQ) + oneshots=True) #: public, called on Customer Cancel Notification + # (RabbitMQ) def set_qos(self, prefetch_size, prefetch_count): """ @@ -156,7 +170,8 @@ class Consumer(Channeler): Cancel the customer. .ack() or .nack() for messages from this customer will have no effect. - :return: a Future to tell when it's done. The future will always succeed - sooner, or later. + :return: a Future to tell when it's done. The future will always + succeed - sooner, or later. NOTE: Future is OK'd when entire channel is destroyed """ @@ -169,7 +184,8 @@ class Consumer(Channeler): self.cancelled = True self.on_cancel() - # you'll blow up big next time you try to use this consumer if you can't cancel, but just close + # you'll blow up big next time you try to use this consumer if you + # can't cancel, but just close if self.consumer_tag is not None: self.method(BasicCancel(self.consumer_tag, False)) else: @@ -203,9 +219,11 @@ class Consumer(Channeler): Handle closing the channel. It sounds like an exception... This is done in two steps: - 1. self.state <- ST_OFFLINE, on_event(EV_OFFLINE) upon detecting that no more messages will + 1. self.state <- ST_OFFLINE, on_event(EV_OFFLINE) upon detecting + that no more messages will be there - 2. self.channel_id <- None, channel is returned to Connection - channel has been physically torn down + 2. self.channel_id <- None, channel is returned to Connection - c + hannel has been physically torn down Note, this can be called multiple times, and eventually with None. @@ -249,16 +267,20 @@ class Consumer(Channeler): rc = payload.reply_code if rc == RESOURCE_LOCKED: # special handling - # This is because we might be reconnecting, and the broker doesn't know yet that we are dead. - # it won't release our exclusive channels, and that's why we'll get RESOURCE_LOCKED. + # This is because we might be reconnecting, and the broker + # doesn't know yet that we are dead. + # it won't release our exclusive channels, and that's why + # we'll get RESOURCE_LOCKED. if self.fail_on_first_time_resource_locked: - # still, a RESOURCE_LOCKED on a first declaration ever suggests something is very wrong + # still, a RESOURCE_LOCKED on a first declaration ever + # suggests something is very wrong self.cancelled = True self.on_cancel() else: # Do not notify the user, and retry at will. - # Do not zero the future - we will need to later confirm it, so it doesn't leak. + # Do not zero the future - we will need to later confirm + # it, so it doesn't leak. should_retry = True if self.future_to_notify: