diff --git a/.gitignore b/.gitignore index cb4ba96eeee4e415e284f6c644d931b517ffa269..f42c2b3c86f7cef0dbb423186b340ff18e37b34d 100644 --- a/.gitignore +++ b/.gitignore @@ -89,4 +89,5 @@ ENV/ # Rope project settings .ropeproject -# Created by .ignore support plugin (hsz.mobi) +.vagrant/ +.idea/ diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index b04e79eabefef0eeea9efe7aff44303af8a064e1..ab366e5d778e9fee0623963e37cd3d82f36eacb3 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -53,7 +53,18 @@ class ClusterNode(object): class Cluster(object): """ - Represents connection to an AMQP cluster. This internally connects only to one node. + Represents connection to an AMQP cluster. This internally connects only to one node, but + will select another one upon connection failing. + + You can pass callbacks to most commands. They will also return an Order instance, + that you can wait for to know an operation has completed. + + Callbacks are executed before Order is marked as complete (it's .result() returns), so if you do: + + cluster.send(.., on_completed=hello).result() + bye() + + hello will be called before bye is called. """ def __init__(self, nodes, backend=PyAMQPBackend): @@ -89,7 +100,8 @@ class Cluster(object): def declare_exchange(self, exchange, on_completed=None, on_failed=None): """ - Declare an exchange + Declare an exchange. It will be re-declared upon reconnection. + :param exchange: Exchange to declare :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance @@ -103,7 +115,8 @@ class Cluster(object): """ Declares a queue. - NOTE THAT IF YOU declare + !!!! If you declare a queue and NOT consume from it, it will not be re-declared + upon reconnection !!!! :param queue: Queue to declare :param on_completed: callable/0 to call when this succeeds diff --git a/coolamqp/handler.py b/coolamqp/handler.py index c0fa2e23f5729a21db731ec12f001dbbe0400281..84e5ea718dc1a3cd44e37290134d806d795583c7 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -7,8 +7,7 @@ import collections import time from .backends import ConnectionFailedError, RemoteAMQPError, Cancelled from .messages import Exchange -from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, \ - MessageReceived +from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \ DeleteExchange, SetQoS, DeclareQueue @@ -124,11 +123,7 @@ class ClusterHandlerThread(threading.Thread): self.backend.queue_declare(order.queue) if order.queue.exchange is not None: - if isinstance(order.queue.exchange, Exchange): - self.backend.queue_bind(order.queue, order.queue.exchange) - else: - for exchange in order.queue.exchange: - self.backend.queue_bind(order.queue, order.queue.exchange) + self.backend.queue_bind(order.queue, order.queue.exchange) self.backend.basic_consume(order.queue) self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue @@ -155,9 +150,7 @@ class ClusterHandlerThread(threading.Thread): else: order.completed() - def run(self): - self._reconnect() - + def __run_wrap(self): # throws _ImOuttaHere # Loop while there are things to do while (not self.is_terminating) or (len(self.order_queue) > 0): try: @@ -166,25 +159,29 @@ class ClusterHandlerThread(threading.Thread): # just drain shit self.backend.process(max_time=1) - except _ImOuttaHere: # thrown only if self.complete_outstanding_upon_termination - assert self.complete_outstanding_upon_termination - break except ConnectionFailedError as e: logger.warning('Connection to broker lost') self.cluster.connected = False self.event_queue.put(ConnectionDown()) - try: - self._reconnect() - except _ImOuttaHere: - break + self._reconnect() + + def run(self): + try: + self._reconnect() + self.__run_wrap() + except _ImOuttaHere: + pass assert self.is_terminating - if (not self.cluster.connected) or (self.backend is not None): - self.backend.shutdown() + if self.cluster.connected or (self.backend is not None): + try: + self.backend.shutdown() + except AttributeError: + pass # backend might be None - the if condition is "or" after all + self.backend = None self.cluster.connected = False - def terminate(self): """ Called by Cluster. Tells to finish all jobs and quit. diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 9be573523d999600239dd631b7baf7346d836f07..ca2bdee919b333f58a8830980e1ab399068b7b3a 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -5,14 +5,30 @@ Orders that can be dispatched to ClusterHandlerThread from threading import Lock +_NOOP_COMP = lambda: None +_NOOP_FAIL = lambda e: None + + class Order(object): """Base class for orders dispatched to ClusterHandlerThread""" def __init__(self, on_completed=None, on_failed=None): - self.on_completed = on_completed - self.on_failed = on_failed + """ + Please note that callbacks will be executed BEFORE the lock is released, + but after .result is updated, ie. if + you have something like + + amqp.send(.., on_completed=hello).result() + bye() + + then hello() will be called BEFORE bye(). + Callbacks are called from CoolAMQP's internal thread + """ + self.on_completed = on_completed or _NOOP_COMP + self.on_failed = on_failed or _NOOP_FAIL self._result = None # None on non-completed # True on completed OK # exception instance on failed + # private self.lock = Lock() self.lock.acquire() self.cancelled = False @@ -27,21 +43,17 @@ class Order(object): def completed(self): self._result = True + self.on_completed() self.lock.release() - if self.on_completed is not None: - self.on_completed() - def failed(self, e): """ :param e: AMQPError instance or Cancelled instance """ self._result = e + self.on_failed(e) self.lock.release() - if self.on_failed is not None: - self.on_failed(e) - def result(self): """Wait until this is completed and return a response""" self.lock.acquire() diff --git a/setup.cfg b/setup.cfg index 224a77957f5db48dfa25c8bb4a35f535202da203..5b368da59afe67383812313007206ff3129174fb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,5 @@ [metadata] -description-file = README.md \ No newline at end of file +description-file = README.md + +[pycodestyle] +max-line-length=120 \ No newline at end of file diff --git a/tests/test_basics.py b/tests/test_basics.py index 092acd20ebe3257d5e9d34a51f45944dea3d2eec..bbd387a6173b0a207bbdf42c7e7699481bd7a785 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -57,7 +57,6 @@ class TestBasics(unittest.TestCase): self.assertIsInstance(p, MessageReceived) self.assertEquals(six.binary_type(p.message.body), 'what the fuck') - def test_bug_hangs(self): p = Queue('lol', exclusive=True) self.amqp.consume(p) @@ -129,6 +128,11 @@ class TestBasics(unittest.TestCase): self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled) + def test_delete_exchange(self): + xchg = Exchange('a_fanout', type='fanout') + self.amqp.declare_exchange(xchg) + self.amqp.delete_exchange(xchg).result() + def test_exchanges(self): xchg = Exchange('a_fanout', type='fanout') self.amqp.declare_exchange(xchg)