From 54d7312a567d8b3d8c38668028a57ee0c00566be Mon Sep 17 00:00:00 2001
From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl>
Date: Tue, 10 Jan 2017 16:54:51 +0100
Subject: [PATCH] drainer

---
 README.md                                |  2 +-
 coolamqp/clustering/__init__.py          |  1 +
 coolamqp/clustering/cluster.py           | 24 ++++++++--
 coolamqp/clustering/events.py            | 57 ++++++++++++++++++++++++
 coolamqp/clustering/single.py            |  9 +++-
 coolamqp/objects.py                      | 26 ++++++++++-
 coolamqp/uplink/connection/connection.py | 20 ++-------
 setup.py                                 |  5 ++-
 tests/test_clustering/test_a.py          | 13 +++++-
 9 files changed, 128 insertions(+), 29 deletions(-)
 create mode 100644 coolamqp/clustering/events.py

diff --git a/README.md b/README.md
index 40da018..d13b002 100644
--- a/README.md
+++ b/README.md
@@ -27,6 +27,6 @@ Enjoy!
 Assertions are sprinkled throughout the code. You may wish to run with optimizations enabled
 if you need every CPU cycle you can get.
 
-**v0.8x** series has unstable API.
+**v0.8x** series has unstable API. It probably won't change much
 
 **v0.9x** series will have a stable API.
\ No newline at end of file
diff --git a/coolamqp/clustering/__init__.py b/coolamqp/clustering/__init__.py
index 4558112..ca17e32 100644
--- a/coolamqp/clustering/__init__.py
+++ b/coolamqp/clustering/__init__.py
@@ -12,3 +12,4 @@ logger = logging.getLogger(__name__)
 __all__ = ('Cluster')
 
 from coolamqp.clustering.cluster import Cluster
+from coolamqp.clustering.events import MessageReceived, NothingMuch, ConnectionLost
diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py
index d17a8e7..19fea7b 100644
--- a/coolamqp/clustering/cluster.py
+++ b/coolamqp/clustering/cluster.py
@@ -11,11 +11,14 @@ from coolamqp.uplink import ListenerThread
 from coolamqp.clustering.single import SingleNodeReconnector
 from coolamqp.attaches import Publisher, AttacheGroup, Consumer
 from coolamqp.objects import Future, Exchange
-from six.moves.queue import Queue
 
 
+from coolamqp.clustering.events import ConnectionLost, MessageReceived, NothingMuch
+
 logger = logging.getLogger(__name__)
 
+THE_POPE_OF_NOPE = NothingMuch()
+
 
 class Cluster(object):
     """
@@ -48,7 +51,10 @@ class Cluster(object):
 
         self.attache_group = AttacheGroup()
 
+        self.events = six.moves.queue.Queue()   # for coolamqp.clustering.events.*
+
         self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener)
+        self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost()))
 
         # Spawn a transactional publisher and a noack publisher
         self.pub_tr = Publisher(Publisher.MODE_CNPUB)
@@ -57,7 +63,19 @@ class Cluster(object):
         self.attache_group.add(self.pub_tr)
         self.attache_group.add(self.pub_na)
 
-        self.events = Queue()   # for
+    def drain(self, timeout):
+        """
+        Return an Event.
+        :param timeout: time to wait for an event. 0 means return immediately. None means block forever
+        :return: an Event instance. NothingMuch is returned when there's nothing within a given timoeout
+        """
+        try:
+            if timeout == 0:
+                return self.events.get_nowait()
+            else:
+                return self.events.get(True, timeout)
+        except six.moves.queue.Empty:
+            return THE_POPE_OF_NOPE
 
     def consume(self, queue, on_message=None, *args, **kwargs):
         """
@@ -76,7 +94,7 @@ class Cluster(object):
         :return: a tuple (Consumer instance, and a Future), that tells, when consumer is ready
         """
         fut = Future()
-        on_message = on_message or self.events.put_nowait
+        on_message = on_message or (lambda rmsg: self.events.put_nowait(MessageReceived(rmsg)))
         con = Consumer(queue, on_message, future_to_notify=fut, *args, **kwargs)
         self.attache_group.add(con)
         return con, fut
diff --git a/coolamqp/clustering/events.py b/coolamqp/clustering/events.py
new file mode 100644
index 0000000..a45f896
--- /dev/null
+++ b/coolamqp/clustering/events.py
@@ -0,0 +1,57 @@
+# coding=UTF-8
+"""
+Cluster will emit Events.
+
+They mean that something, like, happened.
+"""
+from __future__ import print_function, absolute_import, division
+import six
+import time
+import logging
+import monotonic
+from coolamqp.objects import ReceivedMessage
+
+logger = logging.getLogger(__name__)
+
+
+class Event(object):
+    """
+    An event emitted by Cluster
+    """
+
+class Timestamped(Event):
+    """
+    Notes the time of the event (as result of .monotonic)
+    """
+    def __init__(self):
+        super(Timestamped, self).__init__()
+        self.ts = monotonic.monotonic()
+
+
+class NothingMuch(Event):
+    """Nothing happened :D"""
+
+
+class ConnectionLost(Timestamped):
+    """
+    We have lost the connection.
+
+    NOTE that we don't have a ConnectionUp, since re-establishing a connection
+    is a complicated process. Broker might not have recognized the failure,
+    and some queues will be blocked, some might be ok, and the state
+    might be just a bit noisy.
+
+    Please examine your Consumer's .state's to check whether link was regained
+    """
+
+
+class MessageReceived(ReceivedMessage, Timestamped):
+    """
+    Something that works as an ersatz ReceivedMessage, but is an event
+    """
+    def __init__(self, msg):
+        """:type msg: ReceivedMessage"""
+        ReceivedMessage.__init__(self, msg.body, msg.exchange_name, msg.routing_key,
+                                 properties=msg.properties, delivery_tag=msg.delivery_tag,
+                                 ack=msg.ack, nack=msg.nack)
+        Timestamped.__init__(self)
diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py
index f635163..fe2b97f 100644
--- a/coolamqp/clustering/single.py
+++ b/coolamqp/clustering/single.py
@@ -4,6 +4,7 @@ import six
 import logging
 
 from coolamqp.uplink import Connection
+from coolamqp.objects import Callable
 
 logger = logging.getLogger(__name__)
 
@@ -21,6 +22,10 @@ class SingleNodeReconnector(object):
 
         self.terminating = False
 
+        self.on_fail = Callable()       #: public
+
+        self.on_fail.add(self._on_fail)
+
     def is_connected(self):
         return self.connection is not None
 
@@ -31,9 +36,9 @@ class SingleNodeReconnector(object):
         self.connection = Connection(self.node_def, self.listener_thread)
         self.attache_group.attach(self.connection)
         self.connection.start()
-        self.connection.add_finalizer(self.on_fail)
+        self.connection.finalize.add(self.on_fail)
 
-    def on_fail(self):
+    def _on_fail(self):
         if self.terminating:
             return
 
diff --git a/coolamqp/objects.py b/coolamqp/objects.py
index 31a099f..064f228 100644
--- a/coolamqp/objects.py
+++ b/coolamqp/objects.py
@@ -17,6 +17,26 @@ logger = logging.getLogger(__name__)
 EMPTY_PROPERTIES = MessageProperties()
 
 
+class Callable(object):
+    """
+    Add a bunch of callables to one list, and just invoke'm.
+    INTERNAL USE ONLY
+    """
+    def __init__(self, oneshots=False):
+        """:param oneshots: if True, callables will be called and discarded"""
+        self.callables = []
+        self.oneshots = oneshots
+
+    def add(self, callable):
+        self.callables.append(callable)
+
+    def __call__(self, *args, **kwargs):
+        for callable in self.callables:
+            callable(*args, **kwargs)
+        if self.oneshots:
+            self.callables = []
+
+
 class Message(object):
     """
     An AMQP message. Has a binary body, and some properties.
@@ -34,12 +54,12 @@ class Message(object):
         Please take care with passing empty bodies, as py-amqp has some failure on it.
 
         :param body: stream of octets
-        :type body: str (py2) or bytes (py3)
+        :type body: anything with a buffer interface
         :param properties: AMQP properties to be sent along.
                            default is 'no properties at all'
                            You can pass a dict - it will be passed to MessageProperties,
                            but it's slow - don't do that.
-        :type properties: MessageProperties instance, None or a dict
+        :type properties: MessageProperties instance, None or a dict (SLOW!)
         """
         if isinstance(body, six.text_type):
             raise TypeError(u'body cannot be a text type!')
@@ -56,6 +76,7 @@ class Message(object):
 
 LAMBDA_NONE = lambda: None
 
+
 class ReceivedMessage(Message):
     """
     A message that was received from the AMQP broker.
@@ -159,6 +180,7 @@ class Queue(object):
 
 class Future(concurrent.futures.Future):
     """
+    INTERNAL USE ONLY
     Future returned by asynchronous CoolAMQP methods.
 
     A strange future (only one thread may wait for it)
diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py
index 41f9f34..c192512 100644
--- a/coolamqp/uplink/connection/connection.py
+++ b/coolamqp/uplink/connection/connection.py
@@ -13,7 +13,7 @@ from coolamqp.uplink.handshake import Handshaker
 from coolamqp.framing.definitions import ConnectionClose, ConnectionCloseOk
 from coolamqp.uplink.connection.watches import MethodWatch
 from coolamqp.uplink.connection.states import ST_ONLINE, ST_OFFLINE, ST_CONNECTING
-
+from coolamqp.objects import Callable
 
 logger = logging.getLogger(__name__)
 
@@ -53,7 +53,7 @@ class Connection(object):
         self.watches = {}    # channel => list of [Watch instance]
         self.any_watches = []   # list of Watches that should check everything
 
-        self.finalizers = []
+        self.finalize = Callable(oneshots=True)  #: public
 
 
         self.state = ST_CONNECTING
@@ -117,19 +117,6 @@ class Connection(object):
         self.sendf = SendingFramer(self.listener_socket.send)
         self.watch_for_method(0, (ConnectionClose, ConnectionCloseOk), self.on_connection_close)
 
-    def add_finalizer(self, callable):
-        """
-        Add a callable to be executed when all watches were failed and we're really going down.
-
-        Finalizers are not used for logic stuff, but for situations like making TCP reconnects.
-        When we are making a reconnect, we need to be sure that all watches fired - so logic is intact.
-
-        DO NOT PUT CALLABLES THAT HAVE TO DO WITH STATE THINGS, ESPECIALLY ATTACHES.
-
-        :param callable: callable/0
-        """
-        self.finalizers.append(callable)
-
     def on_fail(self):
         """
         Called by event loop when the underlying connection is closed.
@@ -159,8 +146,7 @@ class Connection(object):
         self.any_watches = []
 
         # call finalizers
-        while len(self.finalizers) > 0:
-            self.finalizers.pop()()
+        self.finalize()
 
     def on_connection_close(self, payload):
         """
diff --git a/setup.py b/setup.py
index 2a2fb2a..ab02421 100644
--- a/setup.py
+++ b/setup.py
@@ -9,7 +9,7 @@ setup(name=u'CoolAMQP',
       author=u'DMS Serwis s.c.',
       author_email=u'piotrm@smok.co',
       url=u'https://github.com/smok-serwis/coolamqp',
-#      download_url='https://github.com/smok-serwis/coolamqp/archive/v0.12.zip',
+      download_url='https://github.com/smok-serwis/coolamqp/archive/v0.12.zip',
       keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'],
       packages=[
           'coolamqp',
@@ -23,7 +23,8 @@ setup(name=u'CoolAMQP',
           'coolamqp.framing.compilation',
       ],
       license=u'MIT License',
-      long_description=u'Best-in-class Python AMQP client',
+      long_description=u'''AMQP client, but with dynamic class generation and memoryviews FOR THE GODSPEED.
+Also, handles your reconnects and transactionality THE RIGHT WAY''',
       requires=['amqp', 'six', 'monotonic'],
       tests_require=["nose"],
       test_suite='nose.collector',
diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py
index 8cadf4c..67ebff9 100644
--- a/tests/test_clustering/test_a.py
+++ b/tests/test_clustering/test_a.py
@@ -7,7 +7,7 @@ import six
 import unittest
 import time, logging, threading
 from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage
-from coolamqp.clustering import Cluster
+from coolamqp.clustering import Cluster, MessageReceived, NothingMuch
 
 import time
 
@@ -124,4 +124,13 @@ class TestA(unittest.TestCase):
     def test_consumer_cancel(self):
         con, fut = self.c.consume(Queue(u'hello', exclusive=True, auto_delete=True))
         fut.result()
-        con.cancel().result()
\ No newline at end of file
+        con.cancel().result()
+
+    def test_drain_1(self):
+        con, fut = self.c.consume(Queue(u'hello', exclusive=True, auto_delete=True))
+        fut.result()
+
+        self.c.publish(Message(b'ioi'), routing_key=u'hello')
+
+        self.assertIsInstance(self.c.drain(2), MessageReceived)
+        self.assertIsInstance(self.c.drain(1), NothingMuch)
-- 
GitLab