From f29f4b3ec0a4a66bae351ad607437e75cb8e88ae Mon Sep 17 00:00:00 2001
From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl>
Date: Sun, 8 Jan 2017 01:11:17 +0100
Subject: [PATCH] a useful appliance for consumer/publisher.

a class that allows tracking delivery tags in AMQP-compliant way
---
 .gitignore                           |   2 +-
 README.md                            |   5 +
 coolamqp/attaches/__init__.py        |   3 +-
 coolamqp/attaches/channeler.py       | 177 ++++++++++++++++
 coolamqp/attaches/consumer.py        | 174 ++++------------
 coolamqp/attaches/publisher.py       | 146 +++++++++++++
 coolamqp/attaches/utils.py           | 209 +++++++++++++++++++
 coolamqp/backends/__init__.py        |   5 +-
 coolamqp/backends/base.py            |   4 -
 coolamqp/backends/coolamqp.py        | 114 +++++++++++
 coolamqp/cluster.py                  | 240 ----------------------
 coolamqp/connection/definition.py    |  53 +++--
 coolamqp/handler.py                  | 294 ---------------------------
 coolamqp/messages.py                 |  19 +-
 coolamqp/uplink/__init__.py          |   1 +
 coolamqp/uplink/handshake.py         |  12 +-
 coolamqp/uplink/listener/__init__.py |   1 -
 tests/run.py                         |  10 +-
 tests/test_attaches/__init__.py      |   8 +
 tests/test_attaches/test_utils.py    |  62 ++++++
 20 files changed, 828 insertions(+), 711 deletions(-)
 create mode 100644 coolamqp/attaches/channeler.py
 create mode 100644 coolamqp/attaches/publisher.py
 create mode 100644 coolamqp/attaches/utils.py
 create mode 100644 coolamqp/backends/coolamqp.py
 delete mode 100644 coolamqp/cluster.py
 delete mode 100644 coolamqp/handler.py
 create mode 100644 tests/test_attaches/__init__.py
 create mode 100644 tests/test_attaches/test_utils.py

diff --git a/.gitignore b/.gitignore
index 806e67f..a9dc423 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,7 +6,7 @@ __pycache__/
 
 # C extensions
 *.so
-
+.pycharm_helpers/
 # Distribution / packaging
 .Python
 env/
diff --git a/README.md b/README.md
index ec4bf1b..5d34751 100644
--- a/README.md
+++ b/README.md
@@ -18,3 +18,8 @@ The project is actively maintained and used in a commercial project. Tests can r
 either on Vagrant (Vagrantfile attached) or Travis CI, and run against RabbitMQ.
 
 Enjoy!
+
+
+## Notes
+Assertions are sprinkled throughout the code. You may wish to run with optimizations enabled
+if you need every CPU cycle you can get.
\ No newline at end of file
diff --git a/coolamqp/attaches/__init__.py b/coolamqp/attaches/__init__.py
index 7d31da1..9475c8e 100644
--- a/coolamqp/attaches/__init__.py
+++ b/coolamqp/attaches/__init__.py
@@ -7,4 +7,5 @@ These duties almost require allocating a channel. The attache becomes then respo
 Attache should also register at least one on_fail watch, so it can handle things if they go south.
 """
 
-from coolamqp.attaches.consumer import Consumer
\ No newline at end of file
+from coolamqp.attaches.consumer import Consumer
+from coolamqp.attaches.publisher import Publisher, MODE_NOACK, MODE_CNPUB
diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py
new file mode 100644
index 0000000..1b4b439
--- /dev/null
+++ b/coolamqp/attaches/channeler.py
@@ -0,0 +1,177 @@
+# coding=UTF-8
+"""
+Base class for consumer or publisher with the capabiility to
+set up and tear down channels
+"""
+from __future__ import print_function, absolute_import, division
+import six
+from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, AMQPHeaderFrame
+from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, BasicConsume, \
+    BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \
+    QueueBind, QueueBindOk, ChannelClose, ChannelCloseOk, BasicCancel, BasicDeliver, \
+    BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk
+from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch
+
+
+
+ST_OFFLINE = 0  # Consumer is *not* consuming, no setup attempts are being made
+ST_SYNCING = 1  # A process targeted at consuming has been started
+ST_ONLINE = 2   # Consumer is declared all right
+
+
+class Channeler(object):
+    """
+    A base class for Consumer/Publisher implementing link set up and tear down.
+
+    A channeler can be essentially in 4 states:
+    - ST_OFFLINE (.channel is None): channel is closed, object is unusable. Requires an attach() a connection
+                                     that is being established, or open, or whatever. Connection will notify
+                                     this channeler that it's open.
+    - ST_SYNCING: channeler is opening a channel/doing some other things related to it's setup.
+                  it's going to be ST_ONLINE soon, or go back to ST_OFFLINE.
+                  It has, for sure, acquired a channel number.
+    - ST_ONLINE:  channeler is operational. It has a channel number and has declared everything
+                  it needs to.
+
+                  on_operational(True) will be called when a transition is made TO this state.
+                  on_operational(False) will be called when a transition is made FROM this state.
+
+    - ST_OFFLINE (.channel is not None): channeler is undergoing a close. It has not yet torn down the channel,
+                                         but ordering it to do anything is pointless, because it will not get done
+                                         until attach() with new connection is called.
+    """
+
+    def __init__(self):
+        """
+        [EXTEND ME!]
+        """
+        self.state = ST_OFFLINE
+        self.connection = None
+        self.channel_id = None      # channel obtained from Connection
+
+    def attach(self, connection):
+        """
+        Attach this object to a live Connection.
+
+        :param connection: Connection instance to use
+        """
+        self.connection = connection
+        connection.call_on_connected(self.on_uplink_established)
+
+    # ------- event handlers
+
+    def on_operational(self, operational):
+        """
+        [EXTEND ME] Called by internal methods (on_*) when channel has achieved (or lost) operational status.
+
+        If this is called with operational=True, then for sure it will be called with operational=False.
+
+        This will, therefore, get called an even number of times.
+
+        :param operational: True if channel has just become operational, False if it has just become useless.
+        """
+
+    def on_close(self, payload=None):
+        """
+        [EXTEND ME] Handler for channeler destruction.
+
+        Called on:
+        - channel exception
+        - connection failing
+
+        This handles following situations:
+        - payload is None: this means that connection has gone down hard, so our Connection object is
+                           probably very dead. Transition to ST_OFFLINE (.channel is None)
+        - payload is a ChannelClose: this means that a channel exception has occurred. Dispatch a ChannelCloseOk,
+                                     attempt to log an exception, transition to ST_OFFLINE (.channel is None)
+        - payload is a ChannelCloseOk: this means that it was us who attempted to close the channel. Return the channel
+                                       to free pool, transition to ST_OFFLINE (.channel is None)
+
+        If you need to handle something else, extend this. Take care that this DOES NOT HANDLE errors that happen
+        while state is ST_SYNCING. You can expect this to handle a full channel close, therefore releasing all
+        resources, so it mostly will do *the right thing*.
+
+        If you need to do something else than just close a channel, please extend or modify as necessary.
+
+        """
+        if self.state == ST_ONLINE:
+            # The channel has just lost operationality!
+            self.on_operational(False)
+        self.state = ST_OFFLINE
+
+        if payload is None:
+            # Connection went down HARD
+            self.connection.free_channels.put(self.channel_id)
+            self.channel_id = None
+        elif isinstance(payload, ChannelClose):
+            # We have failed
+            print('Channel close: RC=%s RT=%s', payload.reply_code, payload.reply_text)
+            self.connection.free_channels.put(self.channel_id)
+            self.channel_id = None
+
+        elif isinstance(payload, ChannelCloseOk):
+            self.connection.free_channels.put(self.channel_id)
+            self.channel_id = None
+        else:
+            raise Exception('Unrecognized payload - did you forget to handle something? :D')
+
+    def methods(self, payloads):
+        """
+        Syntactic sugar for
+
+            for payload in paylods:
+                self.method(payload)
+
+        But moar performant.
+        """
+        assert self.channel_id is not None
+        frames = [AMQPMethodFrame(self.channel_id, payload) for payload in payloads]
+        self.connection.send(frames)
+
+    def method(self, payload):
+        """
+        Syntactic sugar for:
+
+            self.connection.send([AMQPMethodFrame(self.channel_id, payload)])
+        """
+        self.methods([payload])
+
+    def method_and_watch(self, method_payload, method_classes_to_watch, callable):
+        """
+        Syntactic sugar for
+
+            self.connection.method_and_watch(self.channel_id,
+                                             method_payload,
+                                             method_classes_to_watch,
+                                             callable)
+        """
+        assert self.channel_id is not None
+        self.connection.method_and_watch(self.channel_id, method_payload, method_classes_to_watch, callable)
+
+    def on_setup(self, payload):
+        """
+        [OVERRIDE ME!] Called with a method frame that signifies a part of setup.
+
+        You must be prepared to handle at least a payload of ChannelOpenOk
+
+        :param payload: AMQP method frame payload
+        """
+        raise Exception('Abstract method - override me!')
+
+
+    def on_uplink_established(self):
+        """Called by connection. Connection reports being ready to do things."""
+        self.state = ST_SYNCING
+        self.channel_id = self.connection.free_channels.pop()
+
+        self.connection.watch_for_method(self.channel_id, (ChannelClose, ChannelCloseOk, BasicCancel),
+                                         self.on_close,
+                                         on_fail=self.on_close)
+
+        self.connection.method_and_watch(
+            self.channel_id,
+            ChannelOpen(),
+            ChannelOpenOk,
+            self.on_setup
+        )
+
diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py
index 7f628cd..43769cf 100644
--- a/coolamqp/attaches/consumer.py
+++ b/coolamqp/attaches/consumer.py
@@ -1,26 +1,17 @@
 # coding=UTF-8
 from __future__ import absolute_import, division, print_function
 import six
-from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, AMQPHeaderFrame
-from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, BasicConsume, \
+from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame
+from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \
     BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \
-    QueueBind, QueueBindOk, ChannelClose, ChannelCloseOk, BasicCancel, BasicDeliver, \
+    QueueBind, QueueBindOk, ChannelClose, BasicCancel, BasicDeliver, \
     BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk
 from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch
 
+from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE
 
-ST_OFFLINE = 0  # Consumer is *not* consuming, no setup attempts are being made
-ST_SYNCING = 1  # A process targeted at consuming has been started
-ST_ONLINE = 2   # Consumer is declared all right
 
-
-EV_ONLINE = 7   # called upon consumer being online and consuming
-EV_CANCEL = 8   # consumer has been cancelled by BasicCancel.
-                # EV_OFFLINE will follow immediately
-EV_OFFLINE = 9  # channel down
-EV_MESSAGE = 10  # received a message
-
-class Consumer(object):
+class Consumer(Channeler):
     """
     This object represents a consumer in the system.
 
@@ -37,14 +28,6 @@ class Consumer(object):
     Since this implies cancelling the consumer, here you go.
     """
 
-    def attach(self, connection):
-        """
-        Attach this consumer to a connection
-        :param connection: coolamqp.framing.Connection
-        """
-        self.connection = connection
-        connection.call_on_connected(self.on_uplink_established)
-
     def __init__(self, queue, no_ack=True, qos=None, dont_pause=False):
         """
         To be instantiated only by Cluster
@@ -55,50 +38,26 @@ class Consumer(object):
         :param no_ack: Will this consumer require acknowledges from messages?
         :param dont_pause: Consumer will fail on the spot instead of pausing
         """
-        self.state = ST_OFFLINE
+        super(Consumer, self).__init__()
+
         self.queue = queue
         self.no_ack = no_ack
 
         # private
-        self.connection = None  # broker on which was last seen
-        self.channel_id = None
         self.cancelled = False  # did the client want to STOP using this consumer?
         self.receiver = None  # MessageReceiver instance
 
 
-    def on_event(self, event, arg=None):
-        """
-        Called upon events arriving. Possible events are:
-        arg={EV_ONLINE} called directly after setup. self.state is not yet set!
-        args={EV_CANCEL} seen a RabbitMQ consumer cancel notify. EV_OFFLINE follows
-        args={EV_OFFLINE} sent when a channel is no longer usable. It may not yet have been torn down.
-                          this will be called only if EV_ONLINE was previously dispatched
-        arg={EV_MESSAGE}
-        """
-        assert event in (EV_OFFLINE, EV_CANCEL, EV_MESSAGE, EV_ONLINE)
+    def on_operational(self, operational):
+        super(Consumer, self).on_operational(operational)
 
-        if event == EV_OFFLINE and (self.state is not ST_ONLINE):
-            return # No point in processing that
-
-        if event == EV_ONLINE:
-            print('Entering ST_ONLINE')
-            self.state = ST_ONLINE
+        if operational:
             assert self.receiver is None
             self.receiver = MessageReceiver(self)
-
-        elif event == EV_OFFLINE:
+        else:
             self.receiver.on_gone()
             self.receiver = None
 
-    def __stateto(self, st):
-        """if st is not current state, statify it.
-        As an extra if it's a transition to ST_OFFLINE, send an event"""
-        if (self.state == ST_ONLINE) and (st == ST_OFFLINE):
-            # the only moment when EV_OFFLINE is generated
-            self.on_event(EV_OFFLINE)
-
-        self.state = st
-
     def on_close(self, payload=None):
         """
         Handle closing the channel. It sounds like an exception...
@@ -108,44 +67,30 @@ class Consumer(object):
            be there
         2. self.channel_id <- None, channel is returned to Connection - channel has been physically torn down
         """
-        should_retry = False
-        release_channel = False
+        if self.state == ST_ONLINE:
+            # The channel has just lost operationality!
+            self.on_operational(False)
+        self.state = ST_OFFLINE
 
-        print('HAYWIRE ', payload)
+        should_retry = False
 
         if isinstance(payload, BasicCancel):
             # Consumer Cancel Notification - by RabbitMQ
-            self.on_event(EV_CANCEL)
-            self.__stateto(ST_OFFLINE)
-            self.connection.send([AMQPMethodFrame(self.channel_id, BasicCancelOk()),
-                                  AMQPMethodFrame(self.channel_id, ChannelClose(0, b'Received basic.cancel', 0, 0))])
-            return
-
-        if isinstance(payload, BasicCancelOk):
-            self.__stateto(ST_OFFLINE)
-            # proceed with teardown
-            self.connection.send([AMQPMethodFrame(self.channel_id, ChannelClose(0, b'Received basic.cancel-ok', 0, 0))])
-            return
-
-        # at this point this can be only ChannelClose, ChannelCloseOk or on_fail
-        # release the kraken
-
-        if isinstance(payload, ChannelClose):
-            # it sounds like an exception
-            self.__stateto(ST_OFFLINE)
-            print(payload.reply_code, payload.reply_text)
-            self.connection.send([AMQPMethodFrame(self.channel_id, ChannelCloseOk())])
-            should_retry = payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED)
-
-        if self.channel_id is not None:
-            self.__stateto(ST_OFFLINE)
-            self.connection.unwatch_all(self.channel_id)
-            self.connection.free_channels.append(self.channel_id)
-            self.channel_id = None
-            self.remaining_for_ack = set()
+            self.methods([BasicCancelOk(), ChannelClose(0, b'Received basic.cancel', 0, 0)])
+
+        elif isinstance(payload, BasicCancelOk):
+            # OK, our cancelling went just fine - proceed with teardown
+            self.method(ChannelClose(0, b'Received basic.cancel-ok', 0, 0))
+
+        elif isinstance(payload, ChannelClose):
+            if payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED):
+                should_retry = True
+            super(Consumer, self).on_close(payload)
+        else:
+            super(Consumer, self).on_close(payload)
 
         if should_retry:
-            self.on_uplink_established() # retry
+            self.attach(self.connection)
 
     def on_delivery(self, sth):
         """
@@ -164,12 +109,6 @@ class Consumer(object):
     def on_setup(self, payload):
         """Called with different kinds of frames - during setup"""
 
-        if self.cancelled:
-            # We were declaring this, but during this situation this
-            # consumer was cancelled. Close the channel and things.
-            self.connection.send(self.channel_id, ChannelClose(0, 'Consumer cancelled', 0, 0))
-            return
-
         if isinstance(payload, ChannelOpenOk):
             # Do we need to declare the exchange?
 
@@ -217,15 +156,10 @@ class Consumer(object):
 
             # We need any form of binding.
             if self.queue.exchange is not None:
-                self.connection.method_and_watch(
-                    self.channel_id,
+                self.method_and_watch(
                     QueueBind(
-                        self.queue.name.encode('utf8'),
-                        self.queue.exchange.name.encode('utf8'),
-                        b'',
-                        False,
-                        []
-                    ),
+                        self.queue.name.encode('utf8'), self.queue.exchange.name.encode('utf8'),
+                        b'', False, []),
                     QueueBindOk,
                     self.on_setup
                 )
@@ -234,24 +168,15 @@ class Consumer(object):
                 self.on_setup(QueueBindOk())
         elif isinstance(payload, QueueBindOk):
             # itadakimasu
-            self.connection.method_and_watch(
-                self.channel_id,
-                BasicConsume(
-                    self.queue.name.encode('utf8'),
-                    self.queue.name.encode('utf8'),
-                    False,
-                    self.no_ack,
-                    self.queue.exclusive,
-                    False,
-                    []
-                ),
+            self.method_and_watch(
+                BasicConsume(self.queue.name.encode('utf8'), self.queue.name.encode('utf8'),
+                    False, self.no_ack, self.queue.exclusive, False, []),
                 BasicConsumeOk,
                 self.on_setup
             )
 
         elif isinstance(payload, BasicConsumeOk):
-            # AWWW RIGHT~!!!
-            self.state = ST_ONLINE
+            # AWWW RIGHT~!!! We're good.
 
             # Register watches for receiving shit
             self.connection.watch(HeaderOrBodyWatch(self.channel_id, self.on_delivery))
@@ -259,27 +184,10 @@ class Consumer(object):
             mw.oneshot = False
             self.connection.watch(mw)
 
-            self.on_event(EV_ONLINE)
-
-    def on_uplink_established(self):
-        """Consumer was created or uplink was regained. Try to declare it"""
-        if self.cancelled:
-            return  # it's OK.
-
-        self.state = ST_SYNCING
-        self.channel_id = self.connection.free_channels.pop()
+            self.state = ST_ONLINE
+            self.on_operational(True)
 
-        self.connection.watch_for_method(self.channel_id,
-                                         (ChannelClose, ChannelCloseOk, BasicCancel),
-                                         self.on_close,
-                                         on_fail=self.on_close)
 
-        self.connection.method_and_watch(
-            self.channel_id,
-            ChannelOpen(),
-            ChannelOpenOk,
-            self.on_setup
-        )
 
 
 
@@ -335,11 +243,9 @@ class MessageReceiver(object):
                 return  # already confirmed/rejected
 
             if success:
-                self.consumer.connection.send([AMQPMethodFrame(self.consumer.channel_id,
-                                                               BasicAck(delivery_tag, False))])
+                self.consumer.method(BasicAck(delivery_tag, False))
             else:
-                self.consumer.connection.send([AMQPMethodFrame(self.consumer.channel_id,
-                                                               BasicReject(delivery_tag, True))])
+                self.consumer.method(BasicReject(delivery_tag, True))
 
         return callable
 
diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py
new file mode 100644
index 0000000..6dc75e9
--- /dev/null
+++ b/coolamqp/attaches/publisher.py
@@ -0,0 +1,146 @@
+# coding=utf-8
+"""
+Module used to publish messages.
+
+Expect wild NameErrors if you build this without RabbitMQ extensions (enabled by default),
+and try to use MODE_CNPUB.
+
+If you use a broker that doesn't support these, just don't use MODE_CNPUB. CoolAMQP is smart enough
+to check with the broker beforehand.
+"""
+from __future__ import absolute_import, division, print_function
+import six
+import warnings
+import logging
+import collections
+from coolamqp.framing.definitions import ChannelOpenOk
+
+try:
+    # these extensions will be available
+    from coolamqp.framing.definitions import ConfirmSelect, ConfirmSelectOk
+except ImportError:
+    pass
+
+from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE
+from coolamqp.uplink import PUBLISHER_CONFIRMS
+
+
+logger = logging.getLogger(__name__)
+
+MODE_NOACK = 0
+MODE_CNPUB = 1  # this will be available only if suitable extensions were used
+
+
+class Publisher(Channeler):
+    """
+    An object that is capable of sucking into a Connection and sending messages.
+    Depending on it's characteristic, it may process messages in:
+
+        - non-ack mode (default) - messages will be dropped on the floor if there is no active uplink
+        - Consumer Publish mode - requires broker support, each message will be ACK/NACKed by the broker
+                                  messages will survive broker reconnections.
+
+                                  If you support this, it is your job to ensure that broker supports
+                                  publisher_confirms. If it doesn't, this publisher will enter ST_OFFLINE
+                                  and emit a warning.
+
+        Other modes may be added in the future.
+    """
+
+    def __init__(self, mode):
+        """
+        Create a new publisher
+        :param mode: Publishing mode to use. One of:
+                         MODE_NOACK - use non-ack mode
+                         MODE_CNPUB - use consumer publishing mode. TypeError will be raised when this publisher
+                                      if attached to a consumer that doesn't have consumer publishes negotiated
+        :type mode: MODE_NOACK or MODE_CNPUB
+        :raise ValueError: mode invalid
+        """
+        super(Publisher, self).__init__()
+        if mode not in (MODE_NOACK, MODE_CNPUB):
+            raise ValueError(u'Invalid publisher mode')
+
+        self.mode = mode
+
+        self.messages = collections.deque() # Messages to publish. From newest to last.
+                                            # tuple of (Message object, exchange name::str, routing_key::str,
+                                            #           Future to confirm or None, flags as tuple|empty tuple
+
+        self.delivery_tag = 0       # next delivery tag
+
+
+    def publish(self, message, exchange_name=b'', routing_key=b''):
+        """
+        Schedule to have a message published.
+
+        :param message: Message object to send
+        :param exchange_name: exchange name to use. Default direct exchange by default
+        :param routing_key: routing key to use
+        :return: a Future object symbolizing delivering the message to AMQP (or any else guarantees publisher mode
+            will make).
+            This is None when mode is noack
+        """
+        # Formulate the request
+        if self.mode == MODE_NOACK:
+
+            # If we are not connected right now, drop the message on the floor and log it with DEBUG
+            if self.state != ST_ONLINE:
+                logger.debug(u'Publish request, but not connected - dropping the message')
+            else:
+                # Dispatch!
+                pass
+
+
+
+            self.messages.append((
+                message,
+                exchange_name,
+                routing_key,
+                None,
+                ()
+            ))
+        else:
+            fut = u'banana banana banana'
+            self.messages.append((
+                message,
+                exchange_name,
+                routing_key,
+                fut
+            ))
+            return fut
+
+        # Attempt dispatching messages as possible
+        if self.mode == MODE_NOACK:
+            pass
+
+    def on_setup(self, payload):
+
+        # Assert that mode is OK
+        if self.mode == MODE_CNPUB:
+            if PUBLISHER_CONFIRMS not in self.connection.extensions:
+                warnings.warn(u'Broker does not support publisher_confirms, refusing to start publisher',
+                              RuntimeWarning)
+                self.state = ST_OFFLINE
+                return
+
+        if isinstance(payload, ChannelOpenOk):
+            # Ok, if this has a mode different from MODE_NOACK, we need to additionally set up
+            # the functionality.
+
+            if self.mode == MODE_CNPUB:
+                self.method_and_watch(ConfirmSelect(False), ConfirmSelectOk, self.on_setup)
+            elif self.mode == MODE_NOACK:
+                # A-OK! Boot it.
+                self.state = ST_ONLINE
+                self.on_operational(True)
+
+        elif self.mode == MODE_CNPUB:
+            # Because only in this case it makes sense to check for MODE_CNPUB
+            if isinstance(payload, ConfirmSelectOk):
+                # A-OK! Boot it.
+                self.state = ST_ONLINE
+                self.on_operational(True)
+
+
+
diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py
new file mode 100644
index 0000000..25dd9c6
--- /dev/null
+++ b/coolamqp/attaches/utils.py
@@ -0,0 +1,209 @@
+# coding=UTF-8
+from __future__ import print_function, absolute_import, division
+import six
+import logging
+import threading
+
+logger = logging.getLogger(__name__)
+
+
+class ConfirmableRejectable(object):
+    """
+    Protocol for objects put into AtomicTagger. You need not subclass it,
+    just support this protocol.
+    """
+
+    def confirm(self):
+        """
+        This has been ACK'd
+        :return: don't care
+        """
+
+    def reject(self):
+        """
+        This has been REJECT'd/NACK'd
+        :return: don't care
+        """
+
+class ManualConfirmableRejectable(ConfirmableRejectable):
+    """
+    A callback-based way to create ConfirmableRejectable objects
+    """
+    def __init__(self, on_ack, on_nack):
+        """
+        :param on_ack: callable/0, will be called on .confirm
+        :param on_nack: callable/0, will be called on .reject
+        """
+        self.on_ack = on_ack
+        self.on_nack = on_nack
+
+    def confirm(self):
+        self.on_ack()
+
+    def reject(self):
+        self.on_nack()
+
+
+class AtomicTagger(object):
+    """
+    This implements a thread-safe dictionary of (integer=>ConfirmableRejectable | None),
+    used for processing delivery tags / (negative) acknowledgements.
+        - you can requisition a key. This key belongs only to you, and the whole world
+          doesn't know you have it.
+
+            delivery_tag_to_use = tagger.get_key()
+
+        - you can deposit a ConfirmableRejectable into the  tagger.
+
+            tagger.deposit(delivery_tag, message)
+
+         After you do so, this tag is subject to be acked/nacked. Read on.
+
+        - you can (multiple)(ack/nack) messages. This coresponds to multiple bit
+          used in basic.ack/basic.nack.
+
+          If this is done, your message objects (that MUST implement the
+          ConfirmableRejectable protocol) will have respective methods called.
+          These methods MUST NOT depend on particular state of locking by this
+          object.
+
+    Thread safety is implemented using reentrant locking. The lock object is a
+    threading.RLock, and you can access it at atomicTagger.lock.
+
+    Please note that delivery tags are increasing non-negative integer.
+    Therefore, X>Y implies that sending/receiving X happened after Y.
+
+    Note that key/delivery_tag of 0 has special meaning of "everything so far".
+
+    This has to be fast for most common cases. Corner cases will be resolved correctly,
+    but maybe not fast.
+    """
+
+    def __init__(self):
+        self.lock = threading.RLock()
+
+        # Protected by lock
+        self.next_tag = 1       # 0 is AMQP-reserved to mean "everything so far"
+        self.tags = []  # a list of (tag, ConfirmableRejectable)
+                        # they remain to be acked/nacked
+                        # invariant: FOR EACH i, j: (i>j) => (tags[i][0] > tags[j][0])
+
+    def deposit(self, tag, obj):
+        """
+        Put a tag into the tag list.
+
+        Putting the same tag more than one time will result in undefined behaviour.
+
+        :param tag: non-negative integer
+        :param obj: ConfirmableRejectable
+                    if you put something that isn't a ConfirmableRejectable, you won't get bitten
+                    until you call .ack() or .nack().
+        """
+        assert tag >= 0
+        opt = (tag, obj)
+
+        with self.lock:
+            if len(self.tags) == 0:
+                self.tags.append(opt)
+            elif self.tags[-1][0] < tag:
+                self.tags.append(opt)
+            else:
+                # Insert a value at place where it makes sense. Iterate from the end, because
+                # values will usually land there...
+                i = len(self.tags) - 1 # start index
+
+                while i>0:  # this will terminate at i=0
+                    if self.tags[i][0] > tag: # this means we should insert it here...
+                        break
+                    i -= 1  # previousl index
+
+                self.tags.insert(i, opt)
+
+    def __acknack(self, tag, multiple, ack):
+        """
+        :param tag: Note that 0 means "everything"
+        :param ack: True to ack, False to nack
+        """
+        # Compute limits - they go from 0 to somewhere
+        with self.lock:
+            start = 0
+            # start and stop will signify the PYTHON SLICE parameters
+
+            if tag > 0:
+
+                if multiple:
+                    # Compute the ranges
+                    for stop, opt in enumerate(self.tags):
+                        if opt[0] == tag:
+                            stop += 1 # this is exactly this tag. Adjust stop to end one further (Python slicing) and stop
+                            break
+                        if opt[0] > tag:
+                            break # We went too far, but it's OK, we don't need to bother with adjusting stop
+                    else:
+                        # List finished without breaking? That would mean the entire range!
+                        stop = len(self.tags)
+                else:
+                    # Just find that piece
+                    for index, opt in enumerate(self.tags):
+                        if opt[0] == tag:
+                            stop = index + 1
+                            break
+                    else:
+                        return  # not found!
+
+
+                if not multiple:
+                    start = stop-1
+            else:
+                # Oh, I know the range!
+                stop = len(self.tags)
+
+            print('Range computed of %s:%s' % (start, stop))
+
+            items = self.tags[start:stop]
+            del self.tags[start:stop]
+
+        for tag, cr in items:
+            if ack:
+                cr.confirm()
+            else:
+                cr.reject()
+
+    def ack(self, tag, multiple):
+        """
+        Acknowledge given objects.
+
+        If multiple, objects UP TO AND INCLUDING tag will have .confirm() called.
+        If it's false, only this precise objects will have done so.
+        It this object does not exist, nothing will happen. Acking same tag more than one time
+        is a no-op.
+
+        Things acked/nacked will be evicted from .data
+        :param tag: delivery tag to use. Note that 0 means "everything so far"
+        """
+        self.__acknack(tag, multiple, True)
+
+    def nack(self, tag, multiple):
+        """
+        Acknowledge given objects.
+
+        If multiple, objects UP TO AND INCLUDING tag will have .confirm() called.
+        If it's false, only this precise objects will have done so.
+        It this object does not exist, nothing will happen. Acking same tag more than one time
+        is a no-op.
+
+        Things acked/nacked will be evicted from .data
+        :param tag: delivery tag to use. Note that 0 means "everything so far"
+        """
+        self.__acknack(tag, multiple, False)
+
+    def get_key(self):
+        """
+        Return a key. It won't be seen here until you deposit it.
+
+        It's just yours, and you can do whatever you want with it, even drop on the floor.
+        :return: a positive integer
+        """
+        with self.lock:
+            self.next_tag += 1
+            return self.next_tag - 1
diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py
index 77106aa..b7d705d 100644
--- a/coolamqp/backends/__init__.py
+++ b/coolamqp/backends/__init__.py
@@ -1,3 +1,6 @@
 # coding=UTF-8
-from coolamqp.backends.pyamqp import PyAMQPBackend
+from coolamqp.backends.coolamqp import CoolAMQPBackend
 from coolamqp.backends.base import AMQPError, ConnectionFailedError, RemoteAMQPError, Cancelled
+"""
+Backend is a legacy way to access CoolAMQP.
+"""
\ No newline at end of file
diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py
index ad1b34f..47e4af6 100644
--- a/coolamqp/backends/base.py
+++ b/coolamqp/backends/base.py
@@ -16,10 +16,6 @@ class ConnectionFailedError(AMQPError):
         return u'ConnectionFailedError("%s")' % map(repr, (self.reply_text, ))
 
 
-class Discarded(Exception):
-    """send() for this message had discard_on_retry"""
-
-
 class Cancelled(Exception):
     """Cancel ordered by user"""
 
diff --git a/coolamqp/backends/coolamqp.py b/coolamqp/backends/coolamqp.py
new file mode 100644
index 0000000..8ca0203
--- /dev/null
+++ b/coolamqp/backends/coolamqp.py
@@ -0,0 +1,114 @@
+# coding=UTF-8
+from __future__ import absolute_import, division, print_function
+
+
+from coolamqp.backends.base import AMQPBackend
+
+
+# Create a global ListenerThread
+from coolamqp.uplink import ListenerThread
+GLOBALS = {
+    'listener': ListenerThread()
+}
+
+
+class CoolAMQPBackend(AMQPBackend):
+    """
+    A backend utilizing CoolAMQP's coolamqp.attaches and coolamqp.connection.
+    Backend starts with creating a connection, and ends with blanging it.
+    """
+    def __init__(self, cluster_node, cluster_handler_thread):
+        """
+        Connects to an AMQP backend.
+        """
+        self.cluster_handler_thread = cluster_handler_thread
+
+    def process(self, max_time=10):
+        """
+        Do bookkeeping, process messages, etc.
+        :param max_time: maximum time in seconds this call can take
+        :raises ConnectionFailedError: if connection failed in the meantime
+        """
+
+    def exchange_declare(self, exchange):
+        """
+        Declare an exchange
+        :param exchange: Exchange object
+        """
+
+    def exchange_delete(self, exchange):
+        """
+        Delete an exchange
+        :param exchange: Exchange object
+        """
+
+    def queue_bind(self, queue, exchange, routing_key=''):
+        """
+        Bind a queue to an exchange
+        :param queue: Queue object
+        :param exchange: Exchange object
+        :param routing_key: routing key to use
+        """
+
+    def queue_delete(self, queue):
+        """
+        Delete a queue.
+
+        :param queue: Queue
+        """
+
+
+    def queue_declare(self, queue):
+        """
+        Declare a queue.
+
+        This will change queue's name if anonymous
+        :param queue: Queue
+        """
+
+    def basic_cancel(self, consumer_tag):
+        """
+        Cancel consuming, identified by a consumer_tag
+        :param consumer_tag: consumer_tag to cancel
+        """
+
+    def basic_consume(self, queue, no_ack=False):
+        """
+        Start consuming from a queue
+        :param queue: Queue object
+        :param no_ack: Messages will not need to be ack()ed for this queue
+        """
+
+    def basic_ack(self, delivery_tag):
+        """
+        ACK a message.
+        :param delivery_tag: delivery tag to ack
+        """
+
+    def basic_qos(self, prefetch_size, prefetch_count, global_):
+        """
+        Issue a basic.qos(prefetch_size, prefetch_count, True) against broker
+        :param prefetch_size: prefetch window size in octets
+        :param prefetch_count: prefetch window in terms of whole messages
+        """
+
+    def basic_reject(self, delivery_tag):
+        """
+        Reject a message
+        :param delivery_tag: delivery tag to reject
+        """
+
+    def basic_publish(self, message, exchange, routing_key):
+        """
+        Send a message
+        :param message: Message object to send
+        :param exchange: Exchange object to publish to
+        :param routing_key: routing key to use
+        """
+
+    def shutdown(self):
+        """
+        Close this connection.
+        This is not allowed to return anything or raise
+        """
+        self.cluster_handler_thread = None  # break GC cycles
diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py
deleted file mode 100644
index 21bbb37..0000000
--- a/coolamqp/cluster.py
+++ /dev/null
@@ -1,240 +0,0 @@
-# coding=UTF-8
-import itertools
-from six.moves import queue as Queue
-from coolamqp.backends import PyAMQPBackend
-from coolamqp.backends.base import Discarded
-from coolamqp.orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \
-                    DeleteExchange, SetQoS, DeclareQueue, Order
-from coolamqp.messages import Exchange
-
-
-class ClusterNode(object):
-    """
-    Definition of a reachable AMQP node.
-
-    This object is hashable.
-    """
-
-    def __init__(self, *args, **kwargs):
-        """
-        Create a cluster node definition.
-
-            a = ClusterNode(host='192.168.0.1', user='admin', password='password',
-                            virtual_host='vhost')
-
-        or
-
-            a = ClusterNode('192.168.0.1', 'admin', 'password')
-
-        Additional keyword parameters that can be specified:
-            heartbeat - heartbeat interval in seconds
-        """
-
-        self.heartbeat = kwargs.pop('heartbeat', None)
-
-        if len(kwargs) > 0:
-            # Prepare arguments for amqp.connection.Connection
-            self.host = kwargs['host']
-            self.user = kwargs['user']
-            self.password = kwargs['password']
-            self.virtual_host = kwargs.get('virtual_host', '/')
-        elif len(args) == 3:
-            self.host, self.user, self.password = args
-            self.virtual_host = '/'
-        elif len(args) == 4:
-            self.host, self.user, self.password, self.virtual_host = args
-        else:
-            raise NotImplementedError #todo implement this
-
-    def __str__(self):
-        return '%s@%s/%s' % (self.host,
-                             self.user,
-                             self.virtual_host)
-
-
-class Cluster(object):
-    """
-    Represents connection to an AMQP cluster. This internally connects only to one node, but
-    will select another one upon connection failing.
-
-    You can pass callbacks to most commands. They will also return an Order instance,
-    that you can wait for to know an operation has completed.
-
-    Callbacks are executed before Order is marked as complete (it's .result() returns), so if you do:
-
-        cluster.send(.., on_completed=hello).result()
-        bye()
-
-    hello will be called before bye is called.
-    """
-
-    def __init__(self, nodes, backend=PyAMQPBackend):
-        """
-        Construct the cluster definition
-        :param nodes: iterable of nodes to try connecting, in this order.
-            if list if exhaused, it will be started from beginning
-        :param backend: backend to use
-        """
-
-        self.backend = backend
-        self.node_to_connect_to = itertools.cycle(nodes)
-
-        self.connected = False      #: public, is connected to broker?
-
-        from .handler import ClusterHandlerThread
-        self.thread = ClusterHandlerThread(self)
-
-    def send(self, message, exchange=None, routing_key='', discard_on_fail=False, on_completed=None, on_failed=None):
-        """
-        Schedule a message to be sent.
-        :param message: Message object to send.
-        :param exchange: Exchange to use. Leave None to use the default exchange
-        :param routing_key: routing key to use
-        :param discard_on_fail: if True, then message is valid for sending ONLY with current connection.
-            Will be discarded upon fail.
-        :param on_completed: callable/0 to call when this succeeds
-        :param on_failed: callable/1 to call when this fails with AMQPError instance
-            or Cancelled instance if user cancelled this order
-            or Discarded instance if message discarded due to 'discard_on_fail'
-        :return: a Future with this order's status
-        """
-        a = SendMessage(message, exchange or Exchange.direct, routing_key,
-                        discard_on_fail=discard_on_fail,
-                        on_completed=on_completed, on_failed=on_failed)
-
-        if discard_on_fail and self.thread.backend is None:
-            o = Order()
-            o.discarded = True
-            on_failed(Discarded())
-            return o
-            # discard at once if no point in sending
-
-        self.thread.order_queue.append(a)
-        return a
-
-    def declare_exchange(self, exchange, on_completed=None, on_failed=None):
-        """
-        Declare an exchange. It will be re-declared upon reconnection.
-
-        :param exchange: Exchange to declare
-        :param on_completed: callable/0 to call when this succeeds
-        :param on_failed: callable/1 to call when this fails with AMQPError instance
-        :return: a Future with this order's status
-        """
-        a = DeclareExchange(exchange, on_completed=on_completed, on_failed=on_failed)
-        self.thread.order_queue.append(a)
-        return a
-
-    def declare_queue(self, queue, on_completed=None, on_failed=None):
-        """
-        Declares a queue.
-
-        !!!! If you declare a queue and NOT consume from it, it will not be re-declared
-        upon reconnection !!!!
-
-        :param queue: Queue to declare
-        :param on_completed: callable/0 to call when this succeeds
-        :param on_failed: callable/1 to call when this fails with AMQPError instance
-        :return: a Future with this order's status
-        """
-        a = DeclareQueue(queue, on_completed=on_completed, on_failed=on_failed)
-        self.thread.order_queue.append(a)
-        return a
-
-    def delete_exchange(self, exchange, on_completed=None, on_failed=None):
-        """
-        Delete an exchange
-        :param exchange: Exchange to delete
-        :param on_completed: callable/0 to call when this succeeds
-        :param on_failed: callable/1 to call when this fails with AMQPError instance
-        :return: a Future with this order's status
-        """
-        a = DeleteExchange(exchange, on_completed=on_completed, on_failed=on_failed)
-        self.thread.order_queue.append(a)
-        return a
-
-    def delete_queue(self, queue, on_completed=None, on_failed=None):
-        """
-        Delete a queue
-        :param queue: Queue to delete
-        :param on_completed: callable/0 to call when this succeeds
-        :param on_failed: callable/1 to call when this fails with AMQPError instance
-        :return: a Future with this order's status
-        """
-        a = DeleteQueue(queue, on_completed=on_completed, on_failed=on_failed)
-        self.thread.order_queue.append(a)
-        return a
-
-    def cancel(self, queue, on_completed=None, on_failed=None):
-        """
-        Cancel consuming from a queue
-
-        :param queue: Queue to consume from
-        :param on_completed: callable/0 to call when this succeeds
-        :param on_failed: callable/1 to call when this fails with AMQPError instance
-        :return: a Future with this order's status
-        """
-        a = CancelQueue(queue, on_completed=on_completed, on_failed=on_failed)
-        self.thread.order_queue.append(a)
-        return a
-
-    def qos(self, prefetch_window, prefetch_count, global_=True):
-        a = SetQoS(prefetch_window, prefetch_count, global_)
-        self.thread.order_queue.append(a)
-        return a
-
-    def consume(self, queue, no_ack=False, on_completed=None, on_failed=None):
-        """
-        Start consuming from a queue
-
-        This queue will be declared to the broker. If this queue has any binds
-        (.exchange field is not empty), queue will be binded to exchanges.
-
-        :param queue: Queue to consume from
-        :param on_completed: callable/0 to call when this succeeds
-        :param no_ack: if True, you will not need to call .ack() for this queue
-        :param on_failed: callable/1 to call when this fails with AMQPError instance
-        :return: a Future with this order's status
-        """
-        a = ConsumeQueue(queue, no_ack=no_ack, on_completed=on_completed, on_failed=on_failed)
-        self.thread.order_queue.append(a)
-        return a
-
-    def drain(self, wait=0):
-        """
-        Return a ClusterEvent on what happened, or None if nothing could be obtained
-        within given time
-        :param wait: Interval to wait for events.
-            Finite number to wait this much seconds before returning None
-            None to wait for infinity
-            0 to return immediately
-        :return: a ClusterEvent instance or None
-        """
-        try:
-            if wait == 0:
-                return self.thread.event_queue.get(False)
-            else:
-                return self.thread.event_queue.get(True, wait)
-        except Queue.Empty:
-            return None
-
-    def start(self):
-        """
-        Connect to the cluster.
-        :return: self
-        """
-        self.thread.start()
-        return self
-
-    def shutdown(self, complete_remaining_tasks=False):
-        """
-        Cleans everything and returns.
-
-        :param complete_remaining_tasks_tasks: if set to True, pending operations will be completed.
-            If False, thread will exit without completing them.
-            This can mean that if the cluster doesn't come up online, shutdown MAY BLOCK FOREVER.
-        """
-        self.thread.complete_remaining_upon_termination = complete_remaining_tasks
-        self.thread.terminate()
-        self.thread.join()
-        # thread closes the AMQP uplink for us
diff --git a/coolamqp/connection/definition.py b/coolamqp/connection/definition.py
index fe4695f..f33cc9a 100644
--- a/coolamqp/connection/definition.py
+++ b/coolamqp/connection/definition.py
@@ -1,28 +1,47 @@
 # coding=UTF-8
 from __future__ import absolute_import, division, print_function
-
+import six
 
 
 class NodeDefinition(object):
     """
-    Definition of a node
+    Definition of a reachable AMQP node.
+
+    This object is hashable.
     """
 
-    def __init__(self, host, port, user, password, virtual_host='/', heartbeat=None):
+    def __init__(self, *args, **kwargs):
         """
-        All necessary information to establish a link to a broker.
-
-        :param host: TCP host, str
-        :param port: TCP port, int
-        :param user: AMQP user
-        :param password: AMQP password
-        :param virtual_host: AMQP virtual host
-        :param amqp_version: AMQP protocol version
+        Create a cluster node definition.
+
+            a = ClusterNode(host='192.168.0.1', user='admin', password='password',
+                            virtual_host='vhost')
+
+        or
+
+            a = ClusterNode('192.168.0.1', 'admin', 'password')
+
+        Additional keyword parameters that can be specified:
+            heartbeat - heartbeat interval in seconds
+            port - TCP port to use. Default is 5672
         """
-        self.user = user
-        self.password = password
-        self.host = host
-        self.port = port
-        self.virtual_host = virtual_host
-        self.heartbeat = heartbeat
 
+        self.heartbeat = kwargs.pop('heartbeat', None)
+        self.port = kwargs.pop('port', 5672)
+
+        if len(kwargs) > 0:
+            # Prepare arguments for amqp.connection.Connection
+            self.host = kwargs['host']
+            self.user = kwargs['user']
+            self.password = kwargs['password']
+            self.virtual_host = kwargs.get('virtual_host', '/')
+        elif len(args) == 3:
+            self.host, self.user, self.password = args
+            self.virtual_host = '/'
+        elif len(args) == 4:
+            self.host, self.user, self.password, self.virtual_host = args
+        else:
+            raise NotImplementedError #todo implement this
+
+    def __str__(self):
+        return six.text_type(b'amqp://%s:%s@%s/%s'.encode('utf8') % (self.host, self.port, self.user, self.virtual_host))
diff --git a/coolamqp/handler.py b/coolamqp/handler.py
deleted file mode 100644
index 1c32fde..0000000
--- a/coolamqp/handler.py
+++ /dev/null
@@ -1,294 +0,0 @@
-# coding=UTF-8
-import threading
-from six.moves import queue
-import six
-import logging
-import collections
-import time
-from .backends import ConnectionFailedError, RemoteAMQPError, Cancelled
-from .messages import Exchange
-from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived
-from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \
-                    AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \
-                    DeleteExchange, SetQoS, DeclareQueue
-
-logger = logging.getLogger(__name__)
-
-
-class _ImOuttaHere(Exception):
-    """Thrown upon thread terminating.
-    Thrown only if complete_remaining_upon_termination is False"""
-
-
-class ClusterHandlerThread(threading.Thread):
-    """
-    Thread that does bookkeeping for a Cluster.
-    """
-    def __init__(self, cluster):
-        """
-        :param cluster: coolamqp.Cluster
-        """
-        threading.Thread.__init__(self)
-
-        self.cluster = cluster
-        self.daemon = True      # if you don't explicitly wait for me, that means you don't need to
-        self.is_terminating = False
-        self.complete_remaining_upon_termination = False
-        self.order_queue = collections.deque()    # queue for inbound orders
-        self.event_queue = queue.Queue()    # queue for tasks done
-        self.connect_id = -1                # connectID of current connection
-
-        self.declared_exchanges = {}        # declared exchanges, by their names
-        self.queues_by_consumer_tags = {}   # tuple of (subbed queue, no_ack::bool), by consumer tags
-
-        self.backend = None
-        self.first_connect = True
-
-        self.qos = None # or tuple (prefetch_size, prefetch_count) if QoS set
-
-    def _reconnect_attempt(self):
-        """Single attempt to regain connectivity. May raise ConnectionFailedError"""
-        self.backend = None
-        if self.backend is not None:
-            self.backend.shutdown()
-            self.backend = None
-
-        self.connect_id += 1
-        node = six.next(self.cluster.node_to_connect_to)
-        logger.info('Connecting to %s', node)
-
-        self.backend = self.cluster.backend(node, self)
-
-        if self.qos is not None:
-            pre_siz, pre_cou, glob = self.qos
-            self.backend.basic_qos(pre_siz, pre_cou, glob)
-
-        for exchange in self.declared_exchanges.values():
-            self.backend.exchange_declare(exchange)
-
-        failed_queues = []
-        for queue, no_ack in self.queues_by_consumer_tags.values():
-            while True:
-                try:
-                    self.backend.queue_declare(queue)
-                    if queue.exchange is not None:
-                        self.backend.queue_bind(queue, queue.exchange)
-                    self.backend.basic_consume(queue, no_ack=no_ack)
-                    logger.info('Consuming from %s no_ack=%s', queue, no_ack)
-                except RemoteAMQPError as e:
-                    if e.code in (403, 405):  # access refused, resource locked
-                        # Ok, queue, what should we do?
-                        if queue.locked_after_reconnect == 'retry':
-                            time.sleep(0.1)
-                            continue    # retry until works
-                        elif queue.locked_after_reconnect == 'cancel':
-                            self.event_queue.put(ConsumerCancelled(queue, ConsumerCancelled.REFUSED_ON_RECONNECT))
-                            failed_queues.append(queue)
-                        elif queue.locked_after_reconnect == 'defer':
-                            self.order_queue.append(ConsumeQueue(queue, no_ack=no_ack))
-                            failed_queues.append(queue)
-                        else:
-                            raise Exception('wtf')
-                    else:
-                        raise  # idk
-                break
-
-        for failed_queue in failed_queues:
-            del self.queues_by_consumer_tags[failed_queue.consumer_tag]
-
-    def _reconnect(self):
-        """Regain connectivity to cluster. May block for a very long time,
-        as it will not """
-        exponential_backoff_delay = 1
-
-        while not self.cluster.connected:
-            try:
-                self._reconnect_attempt()
-            except ConnectionFailedError as e:
-                # a connection failure happened :(
-                logger.warning('Connecting failed due to %s while connecting and initial setup', repr(e))
-                self.cluster.connected = False
-                if self.backend is not None:
-                    self.backend.shutdown()
-                    self.backend = None # good policy to release resources before you sleep
-                time.sleep(exponential_backoff_delay)
-
-                if self.is_terminating and (not self.complete_remaining_upon_termination):
-                    raise _ImOuttaHere()
-
-                exponential_backoff_delay = min(60, exponential_backoff_delay * 2)
-            else:
-                logger.info('Connected to AMQP broker via %s', self.backend)
-                self.cluster.connected = True
-                self.event_queue.put(ConnectionUp(initial=self.first_connect))
-                self.first_connect = False
-
-
-    def perform_order(self):
-        order = self.order_queue.popleft()
-
-        try:
-            if order.cancelled:
-                logger.debug('Order %s was cancelled', order)
-                order._failed(Cancelled())
-                return
-
-            if isinstance(order, SendMessage):
-                self.backend.basic_publish(order.message, order.exchange, order.routing_key)
-            elif isinstance(order, SetQoS):
-                self.qos = order.qos
-                pre_siz, pre_cou, glob = order.qos
-                self.backend.basic_qos(pre_siz, pre_cou, glob)
-            elif isinstance(order, DeclareExchange):
-                self.backend.exchange_declare(order.exchange)
-                self.declared_exchanges[order.exchange.name] = order.exchange
-            elif isinstance(order, DeleteExchange):
-                self.backend.exchange_delete(order.exchange)
-                if order.exchange.name in self.declared_exchanges:
-                    del self.declared_exchanges[order.exchange.name]
-            elif isinstance(order, DeclareQueue):
-                self.backend.queue_declare(order.queue)
-            elif isinstance(order, DeleteQueue):
-                self.backend.queue_delete(order.queue)
-            elif isinstance(order, ConsumeQueue):
-                if order.queue.consumer_tag in self.queues_by_consumer_tags:
-                    order._completed()
-                    return    # already consuming, belay that
-
-                self.backend.queue_declare(order.queue)
-
-                if order.queue.exchange is not None:
-                    self.backend.queue_bind(order.queue, order.queue.exchange)
-
-                self.backend.basic_consume(order.queue, no_ack=order.no_ack)
-                self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue, order.no_ack
-            elif isinstance(order, CancelQueue):
-                try:
-                    q, no_ack = self.queues_by_consumer_tags.pop(order.queue.consumer_tag)
-                except KeyError:
-                    pass  # wat?
-                else:
-                    self.backend.basic_cancel(order.queue.consumer_tag)
-                    self.event_queue.put(ConsumerCancelled(order.queue, ConsumerCancelled.USER_CANCEL))
-            elif isinstance(order, AcknowledgeMessage):
-                if order.connect_id == self.connect_id:
-                    self.backend.basic_ack(order.delivery_tag)
-            elif isinstance(order, NAcknowledgeMessage):
-                if order.connect_id == self.connect_id:
-                    self.backend.basic_reject(order.delivery_tag)
-        except RemoteAMQPError as e:
-            logger.error('Remote AMQP error: %s', e)
-            order._failed(e)  # we are allowed to go on
-        except ConnectionFailedError as e:
-            logger.error('Connection failed while %s: %s', order, e)
-            self.order_queue.appendleft(order)
-            raise
-        else:
-            order._completed()
-
-    def __run_wrap(self):   # throws _ImOuttaHere
-        # Loop while there are things to do
-        while (not self.is_terminating) or (len(self.order_queue) > 0):
-            try:
-                while len(self.order_queue) > 0:
-                    self.perform_order()
-
-                # just drain shit
-                self.backend.process(max_time=0.05)
-            except ConnectionFailedError as e:
-                logger.warning('Connection to broker lost: %s', e)
-                self.cluster.connected = False
-                self.event_queue.put(ConnectionDown())
-
-                # =========================== remove SendMessagees with discard_on_fail
-                my_orders = []      # because order_queue is used by many threads
-                while len(self.order_queue) > 0:
-                    order = self.order_queue.popleft()
-                    if isinstance(order, SendMessage):
-                        if order.message.discard_on_fail:
-                            order._discard()
-                            continue
-
-                    my_orders.append(order)
-
-                # Ok, we have them in order of execution. Append-left in reverse order
-                # to preserve previous order
-                for order in reversed(my_orders):
-                    my_orders.appendleft(order)
-
-            self._reconnect()
-
-    def run(self):
-        try:
-            self._reconnect()
-            self.__run_wrap()
-        except _ImOuttaHere:
-            pass
-
-        assert self.is_terminating
-        if self.cluster.connected or (self.backend is not None):
-            if self.backend is not None:
-                self.backend.shutdown()
-                self.backend = None
-
-            self.cluster.connected = False
-
-    def terminate(self):
-        """
-        Called by Cluster. Tells to finish all jobs and quit.
-        Unacked messages will not be acked. If this is called, connection may die at any time.
-        """
-        self.is_terminating = True
-
-    ## events called
-    def _on_recvmessage(self, body, exchange_name, routing_key, delivery_tag, properties):
-        """
-        Upon receiving a message
-        """
-        from .messages import ReceivedMessage
-
-        self.event_queue.put(MessageReceived(ReceivedMessage(body, self,
-                                                             self.connect_id,
-                                                             exchange_name,
-                                                             routing_key,
-                                                             properties,
-                                                             delivery_tag=delivery_tag)))
-
-    def _on_consumercancelled(self, consumer_tag):
-        """
-        A consumer has been cancelled
-        """
-        try:
-            queue, no_ack = self.queues_by_consumer_tags.pop(consumer_tag)
-        except KeyError:
-            return  # what?
-
-        self.event_queue.put(ConsumerCancelled(queue, ConsumerCancelled.BROKER_CANCEL))
-
-    ## methods to enqueue something into CHT to execute
-
-    def _do_ackmessage(self, receivedMessage, on_completed=None):
-        """
-        Order acknowledging a message.
-        :param receivedMessage: a ReceivedMessage object to ack
-        :param on_completed: callable/0 to call when acknowledgemenet succeeded
-        :return: an AcknowledgeMess
-        """
-        a = AcknowledgeMessage(receivedMessage.connect_id,
-                                                   receivedMessage.delivery_tag,
-                                                   on_completed=on_completed)
-        self.order_queue.append(a)
-        return a
-
-
-    def _do_nackmessage(self, receivedMessage, on_completed=None):
-        """
-        Order acknowledging a message.
-        :param receivedMessage: a ReceivedMessage object to ack
-        :param on_completed: callable/0 to call when acknowledgemenet succeeded
-        """
-        a = NAcknowledgeMessage(receivedMessage.connect_id,
-                                receivedMessage.delivery_tag,
-                                on_completed=on_completed)
-        self.order_queue.append(a)
-        return a
diff --git a/coolamqp/messages.py b/coolamqp/messages.py
index 35c7b68..a7900ad 100644
--- a/coolamqp/messages.py
+++ b/coolamqp/messages.py
@@ -84,12 +84,9 @@ Exchange.direct = Exchange()
 class Queue(object):
     """
     This object represents a Queue that applications consume from or publish to.
-
-    Caveat: Please note the locked_after_reconnect option in constructor
     """
 
-    def __init__(self, name='', durable=False, exchange=None, exclusive=False, auto_delete=False,
-                 locked_after_reconnect='retry'):
+    def __init__(self, name='', durable=False, exchange=None, exclusive=False, auto_delete=False):
         """
         Create a queue definition.
 
@@ -102,12 +99,6 @@ class Queue(object):
         :param exchange: Exchange for this queue to bind to. None for no binding.
         :param exclusive: Is this queue exclusive?
         :param auto_delete: Is this queue auto_delete ?
-        :param locked_after_reconnect: Behaviour when queue is exclusive and ACCESS_REFUSED/RESOURCE_LOCKED
-            is seen on reconnect. Because broker might not know that we have failed, 'retry' will
-            try again until succeeds (default option). This might block for a long time, until the broker
-            realizes previous connection is dead and deletes the queue.
-            'cancel' will return a ConsumerCancelled to client
-            'defer' will attempt to configure the queue later, but will not block other tasks from progressing.
         """
         self.name = name
         # if name is '', this will be filled in with broker-generated name upon declaration
@@ -119,5 +110,9 @@ class Queue(object):
         self.anonymous = name == ''  # if this queue is anonymous, it must be regenerated upon reconnect
 
         self.consumer_tag = name if name != '' else uuid.uuid4().hex    # consumer tag to use in AMQP comms
-        self.locked_after_reconnect = locked_after_reconnect
-        assert locked_after_reconnect in ('retry', 'cancel', 'defer')
\ No newline at end of file
+
+    def __eq__(self, other):
+        return self.name == other.name
+
+    def __hash__(self):
+        return hash(self.name)
diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py
index 56ef2cf..428f48a 100644
--- a/coolamqp/uplink/__init__.py
+++ b/coolamqp/uplink/__init__.py
@@ -13,3 +13,4 @@ from __future__ import absolute_import, division, print_function
 
 from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, MethodWatch, AnyWatch
 from coolamqp.uplink.listener import ListenerThread
+from coolamqp.uplink.handshake import PUBLISHER_CONFIRMS, CONSUMER_CANCEL_NOTIFY
diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py
index 87cd6df..c35423f 100644
--- a/coolamqp/uplink/handshake.py
+++ b/coolamqp/uplink/handshake.py
@@ -9,9 +9,13 @@ from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \
 from coolamqp.framing.frames import AMQPMethodFrame
 from coolamqp.uplink.connection.states import ST_ONLINE
 
+
+PUBLISHER_CONFIRMS = b'publisher_confirms'
+CONSUMER_CANCEL_NOTIFY = b'consumer_cancel_notify'
+
 SUPPORTED_EXTENSIONS = [
-    b'publisher_confirms',
-    b'consumer_cancel_notify'
+    PUBLISHER_CONFIRMS,
+    CONSUMER_CANCEL_NOTIFY
 ]
 
 CLIENT_DATA = [
@@ -32,8 +36,7 @@ class Handshaker(object):
     Object that given a connection rolls the handshake.
     """
 
-    def __init__(self, connection, node_definition,
-                 on_success):
+    def __init__(self, connection, node_definition, on_success):
         """
         :param connection: Connection instance to use
         :type node_definition: NodeDefinition
@@ -74,6 +77,7 @@ class Handshaker(object):
         server_props = dict(payload.server_properties)
         if b'capabilities' in server_props:
             for label, fv in server_props[b'capabilities'][0]:
+                print('Detected extension: %s' % (label, ))
                 if label in SUPPORTED_EXTENSIONS:
                     if fv[0]:
                         self.connection.extensions.append(label)
diff --git a/coolamqp/uplink/listener/__init__.py b/coolamqp/uplink/listener/__init__.py
index 897550a..8dd8152 100644
--- a/coolamqp/uplink/listener/__init__.py
+++ b/coolamqp/uplink/listener/__init__.py
@@ -16,4 +16,3 @@ from __future__ import absolute_import, division, print_function
 
 
 from coolamqp.uplink.listener.thread import ListenerThread
-from coolamqp.uplink.connection import Connection
diff --git a/tests/run.py b/tests/run.py
index d1fca11..3db999c 100644
--- a/tests/run.py
+++ b/tests/run.py
@@ -6,11 +6,11 @@ from coolamqp.connection import NodeDefinition
 from coolamqp.uplink import Connection
 import logging
 
-from coolamqp.attaches import Consumer
+from coolamqp.attaches import Consumer, Publisher, MODE_NOACK, MODE_CNPUB
 from coolamqp.messages import Queue
 
 
-NODE = NodeDefinition('127.0.0.1', 5672, 'user', 'user', heartbeat=5)
+NODE = NodeDefinition('127.0.0.1', 'user', 'user', heartbeat=20)
 logging.basicConfig(level=logging.DEBUG)
 
 if __name__ == '__main__':
@@ -23,6 +23,12 @@ if __name__ == '__main__':
     cons = Consumer(Queue('siema-eniu'), no_ack=False)
     cons.attach(con)
 
+    pub1 = Publisher(MODE_NOACK)
+    pub2 = Publisher(MODE_CNPUB)
+
+    pub1.attach(con)
+    pub2.attach(con)
+
     while True:
         time.sleep(10)
 
diff --git a/tests/test_attaches/__init__.py b/tests/test_attaches/__init__.py
new file mode 100644
index 0000000..1c762b1
--- /dev/null
+++ b/tests/test_attaches/__init__.py
@@ -0,0 +1,8 @@
+# coding=UTF-8
+from __future__ import print_function, absolute_import, division
+import six
+import logging
+
+logger = logging.getLogger(__name__)
+
+
diff --git a/tests/test_attaches/test_utils.py b/tests/test_attaches/test_utils.py
new file mode 100644
index 0000000..68711ab
--- /dev/null
+++ b/tests/test_attaches/test_utils.py
@@ -0,0 +1,62 @@
+# coding=UTF-8
+"""
+It sounds like a melody
+"""
+from __future__ import print_function, absolute_import, division
+import six
+import unittest
+
+
+from coolamqp.attaches.utils import ManualConfirmableRejectable, AtomicTagger
+
+
+class TestAtomicTagger(unittest.TestCase):
+
+    def test_insertionOrder(self):
+        at = AtomicTagger()
+
+        a1 = at.get_key()
+        a2 = at.get_key()
+        a3 = at.get_key()
+
+        at.deposit(a1, b'ABC')
+        at.deposit(a3, b'GHI')
+        at.deposit(a2, b'DEF')
+
+        self.assertEquals(at.tags[0][1], b'ABC')
+        self.assertEquals(at.tags[1][1], b'DEF')
+        self.assertEquals(at.tags[2][1], b'GHI')
+
+    def test_1(self):
+
+        at = AtomicTagger()
+
+        a1 = at.get_key()
+        a2 = at.get_key()
+        a3 = at.get_key()
+
+        n1 = at.get_key()
+        n2 = at.get_key()
+        n3 = at.get_key()
+
+        P = {'acked_P': False, 'nacked_P': False, 'acked_N': False, 'nacked_N': False}
+
+        def assigner(nam, val=True):
+            def x():
+                P[nam] = val
+            return x
+
+
+        at.deposit(a2, ManualConfirmableRejectable(assigner('acked_P'), assigner('nacked_P')))
+        at.deposit(n2, ManualConfirmableRejectable(assigner('acked_N'), assigner('nacked_N')))
+
+        print(at.tags)
+
+        at.ack(a3, True)
+        at.nack(n3, True)
+
+        self.assertTrue(P['acked_P'] and (not P['nacked_P']) and P['nacked_N'] and (not P['acked_N']))
+
+
+
+        
-- 
GitLab