diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 332c83fcdf440af95c14ec2edc5a4eae5328b11a..9485d920cb53537291c7a09568578bdca596c6ed 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -90,6 +90,10 @@ class Channeler(Attache): This will, therefore, get called an even number of times. + Called by Channeler, when: + - Channeler.on_close gets called and state is ST_ONLINE + on_close registers ChannelClose, ChannelCloseOk, BasicCancel + :param operational: True if channel has just become operational, False if it has just become useless. """ @@ -97,9 +101,11 @@ class Channeler(Attache): """ [EXTEND ME] Handler for channeler destruction. - Called on: - - channel exception - - connection failing + Channeler registers this for: + (None - socket dead) + (BasicCancel, ChannelCloseOk, ChannelClose) + + This method provides to send a response for ChannelClose This handles following situations: - payload is None: this means that connection has gone down hard, so our Connection object is @@ -144,7 +150,7 @@ class Channeler(Attache): self.channel_id = None if isinstance(payload, ChannelClose): - logger.debug('Channel closed: %s %s', payload.reply_code, payload.reply_text) + logger.debug('Channel closed: %s %s', payload.reply_code, payload.reply_text.tobytes()) def methods(self, payloads): """ diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index fb3d730419b0c3157149e4fb8d63b04f504ff2ce..00dafb32bc9805b712453cd0ea6e05692c8f205a 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -21,6 +21,9 @@ logger = logging.getLogger(__name__) class Operation(object): """ + An abstract operation. + + This class possesses the means to carry itself out and report back status. Represents the op currently carried out. This will register it's own callback. Please, call on_connection_dead when connection is broken @@ -44,25 +47,32 @@ class Operation(object): """Attempt to perform this op.""" obj = self.obj if isinstance(obj, Exchange): - self.method_and_watch(ExchangeDeclare(self.obj.name.encode('utf8'), obj.type, False, obj.durable, + self.declarer.method_and_watch(ExchangeDeclare(self.obj.name.encode('utf8'), obj.type, False, obj.durable, obj.auto_delete, False, False, []), (ExchangeDeclareOk, ChannelClose), - self._declared) + self._callback) elif isinstance(obj, Queue): - self.method_and_watch(QueueDeclare(obj.name, False, obj.durable, obj.exclusive, obj.auto_delete, False, []), + self.declarer.method_and_watch(QueueDeclare(obj.name, False, obj.durable, obj.exclusive, obj.auto_delete, False, []), (QueueDeclareOk, ChannelClose), - self._declared) + self._callback) def _callback(self, payload): + assert not self.done + self.done = True if isinstance(payload, ChannelClose): if self.fut is not None: self.fut.set_exception(AMQPError(payload)) self.fut = None + else: + # something that had no Future failed. Is it in declared? + if self.obj in self.declarer.declared: + self.declarer.declared.remove(self.obj) #todo access not threadsafe + self.declarer.on_discard(self.obj) else: if self.fut is not None: self.fut.set_result() self.fut = None - + self.declarer.on_operation_done() class Declarer(Channeler, Synchronized): @@ -87,43 +97,50 @@ class Declarer(Channeler, Synchronized): self.on_discard = Callable() # callable/1, with discarded elements - self.doing_now = None # Operation instance that is being progressed right now + self.in_process = None # Operation instance that is being progressed right now - @Synchronized.synchronized - def attach(self, connection): - Channeler.attach(self, connection) - connection.watch(FailWatch(self.on_fail)) + def on_close(self, payload=None): - @Synchronized.synchronized - def on_fail(self): - self.state = ST_OFFLINE + # we are interested in ChannelClose during order execution, + # because that means that operation was illegal, and must + # be discarded/exceptioned on future - # fail all operations in queue... - while len(self.left_to_declare) > 0: - self.left_to_declare.pop().on_connection_dead() + if payload is None: - if self.now_future is not None: - # we were processing something... - self.now_future.set_exception(ConnectionDead()) - self.now_future = None - self.now_processed = None + if self.in_process is not None: + self.in_process.on_connection_dead() + self.in_process = None + # connection down, panic mode engaged. + while len(self.left_to_declare) > 0: + self.left_to_declare.pop().on_connection_dead() + # recast current declarations as new operations + for dec in self.declared: + self.left_to_declare.append(Operation(self, dec)) - def on_close(self, payload=None): - old_con = self.connection - super(Declarer, self).on_close(payload=payload) + super(Declarer, self).on_close() + return - # But, we are super optimists. If we are not cancelled, and connection is ok, - # we must reestablish - if old_con.state == ST_ONLINE and not self.cancelled: - self.attach(old_con) + elif isinstance(payload, ChannelClose): + # Looks like a soft fail - we may try to survive that + old_con = self.connection + super(Declarer, self).on_close() - if payload is None: - # Oh, it's pretty bad. We will need to redeclare... - for obj in self.declared: - self.left_to_declare + # But, we are super optimists. If we are not cancelled, and connection is ok, + # we must reestablish + if old_con.state == ST_ONLINE and not self.cancelled: + self.attach(old_con) + else: + super(Declarer, self).on_close(payload) + def on_operation_done(self): + """ + Called by operation, when it's complete (whether success or fail). + Not called when operation fails due to DC + """ + self.in_process = None + self._do_operations() def declare(self, obj, persistent=False): """ @@ -132,7 +149,7 @@ class Declarer(Channeler, Synchronized): Future is returned, so that user knows when it happens. Declaring is not fast, because there is at most one declare at wire, but at least we know WHAT failed. - Declaring same thing twice is a no-op. + Note that if re-declaring these fails, they will be silently discarded. You can subscribe an on_discard(Exchange | Queue) here. @@ -142,41 +159,36 @@ class Declarer(Channeler, Synchronized): :return: a Future instance :raise ValueError: tried to declare anonymous queue """ - if isinstance(obj, Queue): if obj.anonymous: raise ValueError('Cannot declare anonymous queue') - if obj in self.declared: - return - fut = Future() if persistent: - self.declared.add(obj) + if obj not in self.declared: + self.declared.add(obj) #todo access not threadsafe - op = Operation(self, obj, fut) - - self.left_to_declare.append(op) - - if self.state == ST_ONLINE: - self._do_operations() + self.left_to_declare.append(Operation(self, obj, fut)) + self._do_operations() return fut - @Synchronized.synchronized def _do_operations(self): - """Attempt to do something""" - if len(self.left_to_declare) == 0 or self.busy: + """ + Attempt to execute something. + + To be called when it's possible that something can be done + """ + if (self.state != ST_ONLINE) or len(self.left_to_declare) == 0 or (self.in_process is not None): return - else: - self.now_processed, self.now_future = self.left_to_declare.popleft() - self.busy = True + self.in_process = self.left_to_declare.popleft() + self.in_process.perform() def on_setup(self, payload): - if isinstance(payload, ChannelOpenOk): - self.busy = False + assert self.in_process is None + self.state = ST_ONLINE self._do_operations() diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index d8a38d9a3b830dcc9c6af51661b69cb5db0b40e0..32458a3dc5cfee988707459ade8dedde9af33cd2 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -38,6 +38,8 @@ logger = logging.getLogger(__name__) CnpubMessageSendOrder = collections.namedtuple('CnpubMessageSendOrder', ('message', 'exchange_name', 'routing_key', 'future')) +#todo what if publisher in MODE_CNPUB fails mid message? they dont seem to be recovered + class Publisher(Channeler, Synchronized): """ diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 0db1d0644a7e658246d0f6cac218751b64bef82f..a0a5f44786e1b49d508abec74c1ce9cae2e96bd2 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -65,6 +65,15 @@ class Cluster(object): self.attache_group.add(self.pub_na) self.attache_group.add(self.decl) + def declare(self, obj, persistent=False): + """ + Declare a Queue/Exchange + :param obj: Queue/Exchange object + :param persistent: should it be redefined upon reconnect? + :return: Future + """ + return self.decl.declare(obj, persistent=persistent) + def drain(self, timeout): """ Return an Event. diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 61e6047b035507cea93191dca01d2363b405b28f..c0ee31c167cc97e7382648c957b4f48d984143b4 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -126,12 +126,15 @@ class Exchange(object): direct = None # the direct exchange - def __init__(self, name=u'', type='direct', durable=True, auto_delete=False): + def __init__(self, name=u'', type=u'direct', durable=True, auto_delete=False): self.name = name self.type = type self.durable = durable self.auto_delete = auto_delete + def __repr__(self): + return u'Exchange(%s, %s, %s, %s)' % (repr(self.name), repr(self.type), repr(self.durable), repr(self.auto_delete)) + def __hash__(self): return self.name.__hash__() diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 7fac7c25b71f146ff0d0127767ea62bdab31d925..cb9384f111b21410d7cbe7594d0132172abca127 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -250,6 +250,7 @@ class Connection(object): ListenerThread's on_fail callback. """ try: + #todo why is that necessary? it doesnt pass travis CI if there's no this block self.listener_socket.oneshot(delay, callback) except AttributeError: print(dir(self)) diff --git a/tests/run.py b/tests/run.py index 57fc7b1f5201b349ec4f9a1e3659ca6b077cecc2..9aefc071be7e3c4859d13dd2f2c7bae1822d467e 100644 --- a/tests/run.py +++ b/tests/run.py @@ -1,7 +1,8 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function import time, logging, threading -from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue +from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, Exchange +from coolamqp.exceptions import AMQPError from coolamqp.clustering import Cluster import time @@ -14,9 +15,15 @@ if __name__ == '__main__': amqp = Cluster([NODE]) amqp.start(wait=True) + a = Exchange(u'jola', type='fanout', auto_delete=True, durable=False) + bad = Exchange(u'jola', type='direct', auto_delete=True, durable=True) - c1 = amqp.consume(Queue(b'siema-eniu', exclusive=True), qos=(None, 20)) - c2 = amqp.consume(Queue(b'jo-malina', exclusive=True)) + amqp.declare(a).result() + + try: + amqp.declare(bad).result() + except AMQPError: + print(':)') while True: time.sleep(30) diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py index 55c58f8635be6af05a5ac564f3516e35e3be270d..39cf11014c235d79fbd82e87c4ac427eed761bfd 100644 --- a/tests/test_clustering/test_exchanges.py +++ b/tests/test_clustering/test_exchanges.py @@ -5,7 +5,7 @@ import unittest import time, logging, threading from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage, Exchange from coolamqp.clustering import Cluster, MessageReceived, NothingMuch - +from coolamqp.exceptions import AMQPError import time #todo handle bad auth @@ -21,6 +21,14 @@ class TestExchanges(unittest.TestCase): def tearDown(self): self.c.shutdown() + def test_declare_exchange(self): + a = Exchange(u'jola', type=b'fanout', auto_delete=True) + bad = Exchange(u'jola', type=b'topic', auto_delete=True) + + self.c.declare(a) + + self.assertRaises(AMQPError, lambda: self.c.declare(bad).result()) + self.assertRaises(AMQPError, lambda: self.c.declare(bad).result()) def test_fanout(self): x = Exchange(u'jola', type='direct', auto_delete=True)