diff --git a/README.md b/README.md index 40da0188b3bcd644851329c988b1a572d430e5d5..d13b00283b22773601e427ffcdb2fbb932afbe90 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,6 @@ Enjoy! Assertions are sprinkled throughout the code. You may wish to run with optimizations enabled if you need every CPU cycle you can get. -**v0.8x** series has unstable API. +**v0.8x** series has unstable API. It probably won't change much **v0.9x** series will have a stable API. \ No newline at end of file diff --git a/coolamqp/clustering/__init__.py b/coolamqp/clustering/__init__.py index 455811291383ec21d5aad6490cc8bec928e5215c..ca17e3262079602596775eb6f515e5c50bbab2ed 100644 --- a/coolamqp/clustering/__init__.py +++ b/coolamqp/clustering/__init__.py @@ -12,3 +12,4 @@ logger = logging.getLogger(__name__) __all__ = ('Cluster') from coolamqp.clustering.cluster import Cluster +from coolamqp.clustering.events import MessageReceived, NothingMuch, ConnectionLost diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index d17a8e71319e6a25296dbd959a660d336f0ff287..19fea7b3668b450118a784dc901f955bed2b5b35 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -11,11 +11,14 @@ from coolamqp.uplink import ListenerThread from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.attaches import Publisher, AttacheGroup, Consumer from coolamqp.objects import Future, Exchange -from six.moves.queue import Queue +from coolamqp.clustering.events import ConnectionLost, MessageReceived, NothingMuch + logger = logging.getLogger(__name__) +THE_POPE_OF_NOPE = NothingMuch() + class Cluster(object): """ @@ -48,7 +51,10 @@ class Cluster(object): self.attache_group = AttacheGroup() + self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* + self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) + self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) # Spawn a transactional publisher and a noack publisher self.pub_tr = Publisher(Publisher.MODE_CNPUB) @@ -57,7 +63,19 @@ class Cluster(object): self.attache_group.add(self.pub_tr) self.attache_group.add(self.pub_na) - self.events = Queue() # for + def drain(self, timeout): + """ + Return an Event. + :param timeout: time to wait for an event. 0 means return immediately. None means block forever + :return: an Event instance. NothingMuch is returned when there's nothing within a given timoeout + """ + try: + if timeout == 0: + return self.events.get_nowait() + else: + return self.events.get(True, timeout) + except six.moves.queue.Empty: + return THE_POPE_OF_NOPE def consume(self, queue, on_message=None, *args, **kwargs): """ @@ -76,7 +94,7 @@ class Cluster(object): :return: a tuple (Consumer instance, and a Future), that tells, when consumer is ready """ fut = Future() - on_message = on_message or self.events.put_nowait + on_message = on_message or (lambda rmsg: self.events.put_nowait(MessageReceived(rmsg))) con = Consumer(queue, on_message, future_to_notify=fut, *args, **kwargs) self.attache_group.add(con) return con, fut diff --git a/coolamqp/clustering/events.py b/coolamqp/clustering/events.py new file mode 100644 index 0000000000000000000000000000000000000000..a45f896a4be921fee81274bc5b472ea0fd3a609a --- /dev/null +++ b/coolamqp/clustering/events.py @@ -0,0 +1,57 @@ +# coding=UTF-8 +""" +Cluster will emit Events. + +They mean that something, like, happened. +""" +from __future__ import print_function, absolute_import, division +import six +import time +import logging +import monotonic +from coolamqp.objects import ReceivedMessage + +logger = logging.getLogger(__name__) + + +class Event(object): + """ + An event emitted by Cluster + """ + +class Timestamped(Event): + """ + Notes the time of the event (as result of .monotonic) + """ + def __init__(self): + super(Timestamped, self).__init__() + self.ts = monotonic.monotonic() + + +class NothingMuch(Event): + """Nothing happened :D""" + + +class ConnectionLost(Timestamped): + """ + We have lost the connection. + + NOTE that we don't have a ConnectionUp, since re-establishing a connection + is a complicated process. Broker might not have recognized the failure, + and some queues will be blocked, some might be ok, and the state + might be just a bit noisy. + + Please examine your Consumer's .state's to check whether link was regained + """ + + +class MessageReceived(ReceivedMessage, Timestamped): + """ + Something that works as an ersatz ReceivedMessage, but is an event + """ + def __init__(self, msg): + """:type msg: ReceivedMessage""" + ReceivedMessage.__init__(self, msg.body, msg.exchange_name, msg.routing_key, + properties=msg.properties, delivery_tag=msg.delivery_tag, + ack=msg.ack, nack=msg.nack) + Timestamped.__init__(self) diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index f635163107fcbe63dbc368ee3e35dec5e940673b..fe2b97fa9ba1f141078751cacf2afe2c5b3cd835 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -4,6 +4,7 @@ import six import logging from coolamqp.uplink import Connection +from coolamqp.objects import Callable logger = logging.getLogger(__name__) @@ -21,6 +22,10 @@ class SingleNodeReconnector(object): self.terminating = False + self.on_fail = Callable() #: public + + self.on_fail.add(self._on_fail) + def is_connected(self): return self.connection is not None @@ -31,9 +36,9 @@ class SingleNodeReconnector(object): self.connection = Connection(self.node_def, self.listener_thread) self.attache_group.attach(self.connection) self.connection.start() - self.connection.add_finalizer(self.on_fail) + self.connection.finalize.add(self.on_fail) - def on_fail(self): + def _on_fail(self): if self.terminating: return diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 31a099f7d8b9ce430f930188be01b78c68178124..064f228ba18908b55cbdc34ceb05b4fd2010b4c0 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -17,6 +17,26 @@ logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() +class Callable(object): + """ + Add a bunch of callables to one list, and just invoke'm. + INTERNAL USE ONLY + """ + def __init__(self, oneshots=False): + """:param oneshots: if True, callables will be called and discarded""" + self.callables = [] + self.oneshots = oneshots + + def add(self, callable): + self.callables.append(callable) + + def __call__(self, *args, **kwargs): + for callable in self.callables: + callable(*args, **kwargs) + if self.oneshots: + self.callables = [] + + class Message(object): """ An AMQP message. Has a binary body, and some properties. @@ -34,12 +54,12 @@ class Message(object): Please take care with passing empty bodies, as py-amqp has some failure on it. :param body: stream of octets - :type body: str (py2) or bytes (py3) + :type body: anything with a buffer interface :param properties: AMQP properties to be sent along. default is 'no properties at all' You can pass a dict - it will be passed to MessageProperties, but it's slow - don't do that. - :type properties: MessageProperties instance, None or a dict + :type properties: MessageProperties instance, None or a dict (SLOW!) """ if isinstance(body, six.text_type): raise TypeError(u'body cannot be a text type!') @@ -56,6 +76,7 @@ class Message(object): LAMBDA_NONE = lambda: None + class ReceivedMessage(Message): """ A message that was received from the AMQP broker. @@ -159,6 +180,7 @@ class Queue(object): class Future(concurrent.futures.Future): """ + INTERNAL USE ONLY Future returned by asynchronous CoolAMQP methods. A strange future (only one thread may wait for it) diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 41f9f348873f39c9bd2875baf5af872bef0734d1..c192512001eca4b0a52bc29586cb1e4616ff2d96 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -13,7 +13,7 @@ from coolamqp.uplink.handshake import Handshaker from coolamqp.framing.definitions import ConnectionClose, ConnectionCloseOk from coolamqp.uplink.connection.watches import MethodWatch from coolamqp.uplink.connection.states import ST_ONLINE, ST_OFFLINE, ST_CONNECTING - +from coolamqp.objects import Callable logger = logging.getLogger(__name__) @@ -53,7 +53,7 @@ class Connection(object): self.watches = {} # channel => list of [Watch instance] self.any_watches = [] # list of Watches that should check everything - self.finalizers = [] + self.finalize = Callable(oneshots=True) #: public self.state = ST_CONNECTING @@ -117,19 +117,6 @@ class Connection(object): self.sendf = SendingFramer(self.listener_socket.send) self.watch_for_method(0, (ConnectionClose, ConnectionCloseOk), self.on_connection_close) - def add_finalizer(self, callable): - """ - Add a callable to be executed when all watches were failed and we're really going down. - - Finalizers are not used for logic stuff, but for situations like making TCP reconnects. - When we are making a reconnect, we need to be sure that all watches fired - so logic is intact. - - DO NOT PUT CALLABLES THAT HAVE TO DO WITH STATE THINGS, ESPECIALLY ATTACHES. - - :param callable: callable/0 - """ - self.finalizers.append(callable) - def on_fail(self): """ Called by event loop when the underlying connection is closed. @@ -159,8 +146,7 @@ class Connection(object): self.any_watches = [] # call finalizers - while len(self.finalizers) > 0: - self.finalizers.pop()() + self.finalize() def on_connection_close(self, payload): """ diff --git a/setup.py b/setup.py index 2a2fb2a897cfc51c10c2b25f4c0d42f2065a46b2..ab02421d2b77d8ac5e0ff5e9480c6667762dfc39 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ setup(name=u'CoolAMQP', author=u'DMS Serwis s.c.', author_email=u'piotrm@smok.co', url=u'https://github.com/smok-serwis/coolamqp', -# download_url='https://github.com/smok-serwis/coolamqp/archive/v0.12.zip', + download_url='https://github.com/smok-serwis/coolamqp/archive/v0.12.zip', keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], packages=[ 'coolamqp', @@ -23,7 +23,8 @@ setup(name=u'CoolAMQP', 'coolamqp.framing.compilation', ], license=u'MIT License', - long_description=u'Best-in-class Python AMQP client', + long_description=u'''AMQP client, but with dynamic class generation and memoryviews FOR THE GODSPEED. +Also, handles your reconnects and transactionality THE RIGHT WAY''', requires=['amqp', 'six', 'monotonic'], tests_require=["nose"], test_suite='nose.collector', diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 8cadf4c24435bbb1817c13181751e4969d51f953..67ebff937b88bea4b93acd7049a713a7a75af6fb 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -7,7 +7,7 @@ import six import unittest import time, logging, threading from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage -from coolamqp.clustering import Cluster +from coolamqp.clustering import Cluster, MessageReceived, NothingMuch import time @@ -124,4 +124,13 @@ class TestA(unittest.TestCase): def test_consumer_cancel(self): con, fut = self.c.consume(Queue(u'hello', exclusive=True, auto_delete=True)) fut.result() - con.cancel().result() \ No newline at end of file + con.cancel().result() + + def test_drain_1(self): + con, fut = self.c.consume(Queue(u'hello', exclusive=True, auto_delete=True)) + fut.result() + + self.c.publish(Message(b'ioi'), routing_key=u'hello') + + self.assertIsInstance(self.c.drain(2), MessageReceived) + self.assertIsInstance(self.c.drain(1), NothingMuch)