diff --git a/coolamqp/attaches/__init__.py b/coolamqp/attaches/__init__.py index 5075d303b83e0c24bb739eb6716c7a8c914b0304..aaf4ba03b078c77641b5e53ac00e20f80a032acc 100644 --- a/coolamqp/attaches/__init__.py +++ b/coolamqp/attaches/__init__.py @@ -15,3 +15,4 @@ EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. from coolamqp.attaches.consumer import Consumer from coolamqp.attaches.publisher import Publisher from coolamqp.attaches.agroup import AttacheGroup +from coolamqp.attaches.declarer import Declarer \ No newline at end of file diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 791f5f5b81eed3077e2fc353bc4a67caca41ef77..332c83fcdf440af95c14ec2edc5a4eae5328b11a 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -126,6 +126,10 @@ class Channeler(Attache): # teardown already done return + if isinstance(payload, ChannelClose): + # it would still be good to reply with channel.close-ok + self.method(ChannelCloseOk()) + if self.state == ST_ONLINE: # The channel has just lost operationality! self.on_operational(False) diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py new file mode 100644 index 0000000000000000000000000000000000000000..fb3d730419b0c3157149e4fb8d63b04f504ff2ce --- /dev/null +++ b/coolamqp/attaches/declarer.py @@ -0,0 +1,182 @@ +# coding=UTF-8 +""" +queue.declare, exchange.declare and that shit +""" +from __future__ import print_function, absolute_import, division +import six +import collections +import logging +from coolamqp.framing.definitions import ChannelOpenOk, ExchangeDeclare, ExchangeDeclareOk, QueueDeclare, \ + QueueDeclareOk, ChannelClose +from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE +from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch, FailWatch +from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, Synchronized + +from coolamqp.objects import Future, Exchange, Queue, Callable +from coolamqp.exceptions import AMQPError, ConnectionDead + +logger = logging.getLogger(__name__) + + + +class Operation(object): + """ + Represents the op currently carried out. + + This will register it's own callback. Please, call on_connection_dead when connection is broken + to fail futures with ConnectionDead, since this object does not watch for Fails + """ + def __init__(self, declarer, obj, fut=None): + self.done = False + self.fut = fut + self.declarer = declarer + self.obj = obj + + self.on_done = Callable() # callable/0 + + def on_connection_dead(self): + """To be called by declarer when our link fails""" + if self.fut is not None: + self.fut.set_exception(ConnectionDead()) + self.fut = None + + def perform(self): + """Attempt to perform this op.""" + obj = self.obj + if isinstance(obj, Exchange): + self.method_and_watch(ExchangeDeclare(self.obj.name.encode('utf8'), obj.type, False, obj.durable, + obj.auto_delete, False, False, []), + (ExchangeDeclareOk, ChannelClose), + self._declared) + elif isinstance(obj, Queue): + self.method_and_watch(QueueDeclare(obj.name, False, obj.durable, obj.exclusive, obj.auto_delete, False, []), + (QueueDeclareOk, ChannelClose), + self._declared) + + def _callback(self, payload): + if isinstance(payload, ChannelClose): + if self.fut is not None: + self.fut.set_exception(AMQPError(payload)) + self.fut = None + else: + if self.fut is not None: + self.fut.set_result() + self.fut = None + + + +class Declarer(Channeler, Synchronized): + """ + Doing other things, such as declaring, deleting and other stuff. + + This also maintains a list of declared queues/exchanges, and redeclares them on each reconnect. + """ + def __init__(self): + """ + Create a new declarer. + """ + Channeler.__init__(self) + Synchronized.__init__(self) + + self.declared = set() # since Queues and Exchanges are hashable... + # anonymous queues aren't, but we reject those + # persistent + + self.left_to_declare = collections.deque() # since last disconnect. persistent+transient + # deque of Operation objects + + self.on_discard = Callable() # callable/1, with discarded elements + + self.doing_now = None # Operation instance that is being progressed right now + + @Synchronized.synchronized + def attach(self, connection): + Channeler.attach(self, connection) + connection.watch(FailWatch(self.on_fail)) + + @Synchronized.synchronized + def on_fail(self): + self.state = ST_OFFLINE + + # fail all operations in queue... + while len(self.left_to_declare) > 0: + self.left_to_declare.pop().on_connection_dead() + + if self.now_future is not None: + # we were processing something... + self.now_future.set_exception(ConnectionDead()) + self.now_future = None + self.now_processed = None + + + + def on_close(self, payload=None): + old_con = self.connection + super(Declarer, self).on_close(payload=payload) + + # But, we are super optimists. If we are not cancelled, and connection is ok, + # we must reestablish + if old_con.state == ST_ONLINE and not self.cancelled: + self.attach(old_con) + + if payload is None: + # Oh, it's pretty bad. We will need to redeclare... + for obj in self.declared: + self.left_to_declare + + + def declare(self, obj, persistent=False): + """ + Schedule to have an object declared. + + Future is returned, so that user knows when it happens. + + Declaring is not fast, because there is at most one declare at wire, but at least we know WHAT failed. + Declaring same thing twice is a no-op. + + Note that if re-declaring these fails, they will be silently discarded. + You can subscribe an on_discard(Exchange | Queue) here. + + :param obj: Exchange or Queue instance + :param persistent: will be redeclared upon disconnect. To remove, use "undeclare" + :return: a Future instance + :raise ValueError: tried to declare anonymous queue + """ + + if isinstance(obj, Queue): + if obj.anonymous: + raise ValueError('Cannot declare anonymous queue') + + if obj in self.declared: + return + + fut = Future() + + if persistent: + self.declared.add(obj) + + op = Operation(self, obj, fut) + + self.left_to_declare.append(op) + + if self.state == ST_ONLINE: + self._do_operations() + + return fut + + + @Synchronized.synchronized + def _do_operations(self): + """Attempt to do something""" + if len(self.left_to_declare) == 0 or self.busy: + return + else: + self.now_processed, self.now_future = self.left_to_declare.popleft() + self.busy = True + + + def on_setup(self, payload): + + if isinstance(payload, ChannelOpenOk): + self.busy = False + self._do_operations() diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py index 74d0aaa9a06e3a19ec8885dc3a92847fc334ece7..1940f92bd80e05a066944d9bf900f438a3125b13 100644 --- a/coolamqp/attaches/utils.py +++ b/coolamqp/attaches/utils.py @@ -219,6 +219,9 @@ class Synchronized(object): def __init__(self): self._monitor_lock = threading.Lock() + def get_monitor_lock(self): + return self._monitor_lock + @staticmethod def synchronized(fun): @functools.wraps(fun) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 686c23b771ef59e0d1aff949ca28a1dfdf9cf5c7..0db1d0644a7e658246d0f6cac218751b64bef82f 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -9,7 +9,7 @@ import warnings import time from coolamqp.uplink import ListenerThread from coolamqp.clustering.single import SingleNodeReconnector -from coolamqp.attaches import Publisher, AttacheGroup, Consumer +from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.objects import Future, Exchange @@ -59,9 +59,11 @@ class Cluster(object): # Spawn a transactional publisher and a noack publisher self.pub_tr = Publisher(Publisher.MODE_CNPUB) self.pub_na = Publisher(Publisher.MODE_NOACK) + self.decl = Declarer() self.attache_group.add(self.pub_tr) self.attache_group.add(self.pub_na) + self.attache_group.add(self.decl) def drain(self, timeout): """ diff --git a/coolamqp/exceptions.py b/coolamqp/exceptions.py index 4603aaeeb5f7bde3c20fdaf1c3eb3999d89d3f84..dec32c0dc7ba221925b43b3c31bced88f41b5c05 100644 --- a/coolamqp/exceptions.py +++ b/coolamqp/exceptions.py @@ -6,6 +6,13 @@ class CoolAMQPError(Exception): """Base class for CoolAMQP errors""" +class ConnectionDead(CoolAMQPError): + """ + Operation could be not completed because some other error + than a legit AMQPError occurred, such as exploding kitten + """ + + class AMQPError(CoolAMQPError): """ Base class for errors received from AMQP server diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 9f8635954a8c031420e3d2b586f9861f999f432c..61e6047b035507cea93191dca01d2363b405b28f 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -136,7 +136,8 @@ class Exchange(object): return self.name.__hash__() def __eq__(self, other): - return self.name == other.name + return (self.name == other.name) and (type(self) == type(other)) + Exchange.direct = Exchange() @@ -172,7 +173,7 @@ class Queue(object): self.consumer_tag = name if name != '' else uuid.uuid4().hex # consumer tag to use in AMQP comms def __eq__(self, other): - return self.name == other.name + return (self.name == other.name) and (type(self) == type(other)) def __hash__(self): return hash(self.name) diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 25711809529427d7b9695c918b12ea91dacc44b2..c3ea13fd0dd678c03a0db2aeb207e2fe2d9b4f48 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -15,14 +15,14 @@ CONSUMER_CANCEL_NOTIFY = b'consumer_cancel_notify' SUPPORTED_EXTENSIONS = [ PUBLISHER_CONFIRMS, - CONSUMER_CANCEL_NOTIFY + CONSUMER_CANCEL_NOTIFY # half assed support - we just .cancel the consumer, see #12 ] CLIENT_DATA = [ # because RabbitMQ is some kind of a fascist and does not allow # these fields to be of type short-string (b'product', (b'CoolAMQP', 'S')), - (b'version', (b'0.81', 'S')), + (b'version', (b'0.82', 'S')), (b'copyright', (b'Copyright (C) 2016-2017 DMS Serwis', 'S')), (b'information', (b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', 'S')), (b'capabilities', ([(capa, (True, 't')) for capa in SUPPORTED_EXTENSIONS], 'F')), diff --git a/setup.py b/setup.py index fab0acbae496215fa269a04e7f576cd3235ede72..928ce75da08710fb093b0349f88e30d5d2b310f6 100644 --- a/setup.py +++ b/setup.py @@ -4,12 +4,12 @@ from setuptools import setup setup(name=u'CoolAMQP', - version='0.81', + version='0.82', description=u'The fastest AMQP client', author=u'DMS Serwis s.c.', author_email=u'piotrm@smok.co', url=u'https://github.com/smok-serwis/coolamqp', - download_url='https://github.com/smok-serwis/coolamqp/archive/v0.81.zip', + download_url='https://github.com/smok-serwis/coolamqp/archive/v0.82.zip', keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], packages=[ 'coolamqp',