From 0e20943c290fe0e72dabf6f7ca18d6217056ad41 Mon Sep 17 00:00:00 2001
From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl>
Date: Sun, 8 Oct 2017 07:05:36 +0200
Subject: [PATCH] LINES

---
 coolamqp/attaches/consumer.py | 100 +++++++++++++++++++++-------------
 1 file changed, 61 insertions(+), 39 deletions(-)

diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py
index d908b12..e62f99c 100644
--- a/coolamqp/attaches/consumer.py
+++ b/coolamqp/attaches/consumer.py
@@ -28,16 +28,17 @@ class BodyReceiveMode(object):
     # C - copy (copies every byte once)
 
     BYTES = 0  # message.body will be a single bytes object
-    # this will gather frames as memoryviews, and b''.join() them upon receiving last frame
+    # this will gather frames as memoryviews, and b''.join() them upon
+    # receiving last frame
     # this is C
 
     MEMORYVIEW = 1  # message.body will be returned as a memoryview object
     # this is ZC for small messages, and C for multi-frame ones
-    # think less than 800B, since 2048 is the buffer for socket recv, and an AMQP
-    # frame (or many frames!) have to fit there
+    # think less than 800B, since 2048 is the buffer for socket recv, and an
+    # AMQP frame (or many frames!) have to fit there
 
-    LIST_OF_MEMORYVIEW = 2  # message.body will be returned as list of memoryview objects
-    # these constitute received pieces. this is always ZC
+    LIST_OF_MEMORYVIEW = 2  # message.body will be returned as list of
+    # memoryview objects these constitute received pieces. this is always ZC
 
 
 class Consumer(Channeler):
@@ -52,14 +53,16 @@ class Consumer(Channeler):
     on_start will be called. This means that broker has confirmed that this
     consumer is operational and receiving messages.
 
-    Note that does not attempt to cancel consumers, or any of such nonsense. Having
-    a channel per consumer gives you the unique possibility of simply closing the channel.
-    Since this implies cancelling the consumer, here you go.
+    Note that does not attempt to cancel consumers, or any of such nonsense.
+    Having a channel per consumer gives you the unique possibility of simply
+    closing  the channel. Since this implies cancelling the consumer, here you
+    go.
 
-    WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS DO!
+    WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS
+     DO!
 
-    You can subscribe to be informed when the consumer is cancelled (for any reason,
-    server or client side) with:
+    You can subscribe to be informed when the consumer is cancelled (for any
+    reason, server or client side) with:
 
         con, fut = Cluster.consume(...)
 
@@ -69,7 +72,8 @@ class Consumer(Channeler):
         con.on_cancel.add(im_called_on_cancel_for_any_reason)
         con.cancel()
 
-    Or, if RabbitMQ is in use, you can be informed upon a Consumer Cancel Notification:
+    Or, if RabbitMQ is in use, you can be informed upon a Consumer Cancel
+    Notification:
 
         con.on_broker_cancel.add(im_cancelled_by_broker)
 
@@ -82,31 +86,38 @@ class Consumer(Channeler):
                  body_receive_mode=BodyReceiveMode.BYTES
                  ):
         """
-        Note that if you specify QoS, it is applied before basic.consume is sent. This will prevent
-        the broker from hammering you into oblivion with a mountain of messages.
+        Note that if you specify QoS, it is applied before basic.consume is
+        sent. This will prevent the broker from hammering you into oblivion
+        with a mountain of messages.
 
         :param queue: Queue object, being consumed from right now.
             Note that name of anonymous queue might change at any time!
         :param on_message: callable that will process incoming messages
         :type on_message: callable(ReceivedMessage instance)
         :param no_ack: Will this consumer require acknowledges from messages?
-        :param qos: a tuple of (prefetch size, prefetch window) for this consumer, or an int (prefetch window only)
-            If an int is passed, prefetch size will be set to 0 (which means undefined), and this int
-            will be used for prefetch window
+        :param qos: a tuple of (prefetch size, prefetch window) for this
+            consumer, or an int (prefetch window only).
+            If an int is passed, prefetch size will be set to 0 (which means
+            undefined), and this int will be used for prefetch window
         :type qos: tuple(int, int) or tuple(None, int) or int
-        :param cancel_on_failure: Consumer will cancel itself when link goes down
+        :param cancel_on_failure: Consumer will cancel itself when link goes
+            down
         :type cancel_on_failure: bool
-        :param future_to_notify: Future to succeed when this consumer goes online for the first time.
-                                 This future can also raise with AMQPError if it fails to.
-        :param fail_on_first_time_resource_locked: When consumer is declared for the first time,
-                                                   and RESOURCE_LOCKED is encountered, it will fail the
-                                                   future with ResourceLocked, and consumer will cancel itself.
-                                                   By default it will retry until success is made.
-                                                   If the consumer doesn't get the chance to be declared - because
-                                                   of a connection fail - next reconnect will consider this to be
-                                                   SECOND declaration, ie. it will retry ad infinitum
+        :param future_to_notify: Future to succeed when this consumer goes
+                                 online for the first time.
+                                 This future can also raise with AMQPError if
+                                 it fails to.
+        :param fail_on_first_time_resource_locked: When consumer is declared
+            for the first time, and RESOURCE_LOCKED is encountered, it will
+            fail the future with ResourceLocked, and consumer will cancel
+            itself.
+            By default it will retry until success is made.
+            If the consumer doesn't get the chance to be declared - because
+            of a connection fail - next reconnect will consider this to be
+            SECOND declaration, ie. it will retry ad infinitum
         :type fail_on_first_time_resource_locked: bool
-        :param body_receive_mode: how should message.body be received. This has a performance impact
+        :param body_receive_mode: how should message.body be received. This
+            has a performance impact
         :type body_receive_mode: BodyReceiveMode.*
         """
         super(Consumer, self).__init__()
@@ -117,7 +128,8 @@ class Consumer(Channeler):
         self.on_message = on_message
 
         # private
-        self.cancelled = False  # did the client want to STOP using this consumer?
+        self.cancelled = False  # did the client want to STOP using this
+                                # consumer?
         self.receiver = None  # MessageReceiver instance
 
         self.attache_group = None  # attache group this belongs to.
@@ -129,7 +141,8 @@ class Consumer(Channeler):
         self.future_to_notify = future_to_notify
         self.future_to_notify_on_dead = None  # .cancel
 
-        self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked
+        self.fail_on_first_time_resource_locked = \
+            fail_on_first_time_resource_locked
         self.cancel_on_failure = cancel_on_failure
         self.body_receive_mode = body_receive_mode
 
@@ -138,7 +151,8 @@ class Consumer(Channeler):
         self.on_cancel = Callable(
             oneshots=True)  #: public, called on cancel for any reason
         self.on_broker_cancel = Callable(
-            oneshots=True)  #: public, called on Customer Cancel Notification (RabbitMQ)
+            oneshots=True)  #: public, called on Customer Cancel Notification
+        #  (RabbitMQ)
 
     def set_qos(self, prefetch_size, prefetch_count):
         """
@@ -156,7 +170,8 @@ class Consumer(Channeler):
         Cancel the customer.
 
         .ack() or .nack() for messages from this customer will have no effect.
-        :return: a Future to tell when it's done. The future will always succeed - sooner, or later.
+        :return: a Future to tell when it's done. The future will always
+                 succeed - sooner, or later.
                  NOTE: Future is OK'd when entire channel is destroyed
         """
 
@@ -169,7 +184,8 @@ class Consumer(Channeler):
 
         self.cancelled = True
         self.on_cancel()
-        # you'll blow up big next time you try to use this consumer if you can't cancel, but just close
+        # you'll blow up big next time you try to use this consumer if you
+        # can't cancel, but just close
         if self.consumer_tag is not None:
             self.method(BasicCancel(self.consumer_tag, False))
         else:
@@ -203,9 +219,11 @@ class Consumer(Channeler):
         Handle closing the channel. It sounds like an exception...
 
         This is done in two steps:
-        1. self.state <- ST_OFFLINE, on_event(EV_OFFLINE)   upon detecting that no more messages will
+        1. self.state <- ST_OFFLINE, on_event(EV_OFFLINE)   upon detecting
+           that no more messages will
            be there
-        2. self.channel_id <- None, channel is returned to Connection - channel has been physically torn down
+        2. self.channel_id <- None, channel is returned to Connection - c
+           hannel has been physically torn down
 
         Note, this can be called multiple times, and eventually with None.
 
@@ -249,16 +267,20 @@ class Consumer(Channeler):
             rc = payload.reply_code
             if rc == RESOURCE_LOCKED:
                 # special handling
-                # This is because we might be reconnecting, and the broker doesn't know yet that we are dead.
-                # it won't release our exclusive channels, and that's why we'll get RESOURCE_LOCKED.
+                # This is because we might be reconnecting, and the broker
+                # doesn't know yet that we are dead.
+                # it won't release our exclusive channels, and that's why
+                # we'll get RESOURCE_LOCKED.
 
                 if self.fail_on_first_time_resource_locked:
-                    # still, a RESOURCE_LOCKED on a first declaration ever suggests something is very wrong
+                    # still, a RESOURCE_LOCKED on a first declaration ever
+                    # suggests something is very wrong
                     self.cancelled = True
                     self.on_cancel()
                 else:
                     # Do not notify the user, and retry at will.
-                    # Do not zero the future - we will need to later confirm it, so it doesn't leak.
+                    # Do not zero the future - we will need to later confirm
+                    # it, so it doesn't leak.
                     should_retry = True
 
             if self.future_to_notify:
-- 
GitLab