diff --git a/coolamqp/attaches/__init__.py b/coolamqp/attaches/__init__.py index 5075d303b83e0c24bb739eb6716c7a8c914b0304..aaf4ba03b078c77641b5e53ac00e20f80a032acc 100644 --- a/coolamqp/attaches/__init__.py +++ b/coolamqp/attaches/__init__.py @@ -15,3 +15,4 @@ EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. from coolamqp.attaches.consumer import Consumer from coolamqp.attaches.publisher import Publisher from coolamqp.attaches.agroup import AttacheGroup +from coolamqp.attaches.declarer import Declarer \ No newline at end of file diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 791f5f5b81eed3077e2fc353bc4a67caca41ef77..9485d920cb53537291c7a09568578bdca596c6ed 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -90,6 +90,10 @@ class Channeler(Attache): This will, therefore, get called an even number of times. + Called by Channeler, when: + - Channeler.on_close gets called and state is ST_ONLINE + on_close registers ChannelClose, ChannelCloseOk, BasicCancel + :param operational: True if channel has just become operational, False if it has just become useless. """ @@ -97,9 +101,11 @@ class Channeler(Attache): """ [EXTEND ME] Handler for channeler destruction. - Called on: - - channel exception - - connection failing + Channeler registers this for: + (None - socket dead) + (BasicCancel, ChannelCloseOk, ChannelClose) + + This method provides to send a response for ChannelClose This handles following situations: - payload is None: this means that connection has gone down hard, so our Connection object is @@ -126,6 +132,10 @@ class Channeler(Attache): # teardown already done return + if isinstance(payload, ChannelClose): + # it would still be good to reply with channel.close-ok + self.method(ChannelCloseOk()) + if self.state == ST_ONLINE: # The channel has just lost operationality! self.on_operational(False) @@ -140,7 +150,7 @@ class Channeler(Attache): self.channel_id = None if isinstance(payload, ChannelClose): - logger.debug('Channel closed: %s %s', payload.reply_code, payload.reply_text) + logger.debug('Channel closed: %s %s', payload.reply_code, payload.reply_text.tobytes()) def methods(self, payloads): """ diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index c6f208766aef424ea3af85674112780a636498a5..269ae0a11a0dc2ca4425649c2cd9bdc6eb1e7121 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -231,11 +231,11 @@ class Consumer(Channeler): :param sth: AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or AMQPBodyFrame """ if isinstance(sth, BasicDeliver): - self.receiver.on_basic_deliver(sth) + self.receiver.on_basic_deliver(sth) elif isinstance(sth, AMQPBodyFrame): - self.receiver.on_body(sth.data) + self.receiver.on_body(sth.data) elif isinstance(sth, AMQPHeaderFrame): - self.receiver.on_head(sth) + self.receiver.on_head(sth) # No point in listening for more stuff, that's all the watches even listen for @@ -285,7 +285,7 @@ class Consumer(Channeler): elif isinstance(payload, QueueDeclareOk): # did we need an anonymous name? if self.queue.anonymous: - self.queue.name = payload.queue_name.tobytes() + self.queue.name = payload.queue.tobytes() # We need any form of binding. if self.queue.exchange is not None: diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py new file mode 100644 index 0000000000000000000000000000000000000000..097726953d647d7aef036198a905913217605c2e --- /dev/null +++ b/coolamqp/attaches/declarer.py @@ -0,0 +1,196 @@ +# coding=UTF-8 +""" +queue.declare, exchange.declare and that shit +""" +from __future__ import print_function, absolute_import, division +import six +import collections +import logging +from coolamqp.framing.definitions import ChannelOpenOk, ExchangeDeclare, ExchangeDeclareOk, QueueDeclare, \ + QueueDeclareOk, ChannelClose +from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE +from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, Synchronized + +from coolamqp.objects import Future, Exchange, Queue, Callable +from coolamqp.exceptions import AMQPError, ConnectionDead + +logger = logging.getLogger(__name__) + + + +class Operation(object): + """ + An abstract operation. + + This class possesses the means to carry itself out and report back status. + Represents the op currently carried out. + + This will register it's own callback. Please, call on_connection_dead when connection is broken + to fail futures with ConnectionDead, since this object does not watch for Fails + """ + def __init__(self, declarer, obj, fut=None): + self.done = False + self.fut = fut + self.declarer = declarer + self.obj = obj + + self.on_done = Callable() # callable/0 + + def on_connection_dead(self): + """To be called by declarer when our link fails""" + if self.fut is not None: + self.fut.set_exception(ConnectionDead()) + self.fut = None + + def perform(self): + """Attempt to perform this op.""" + obj = self.obj + if isinstance(obj, Exchange): + self.declarer.method_and_watch(ExchangeDeclare(self.obj.name.encode('utf8'), obj.type, False, obj.durable, + obj.auto_delete, False, False, []), + (ExchangeDeclareOk, ChannelClose), + self._callback) + elif isinstance(obj, Queue): + self.declarer.method_and_watch(QueueDeclare(obj.name, False, obj.durable, obj.exclusive, obj.auto_delete, False, []), + (QueueDeclareOk, ChannelClose), + self._callback) + + def _callback(self, payload): + assert not self.done + self.done = True + if isinstance(payload, ChannelClose): + if self.fut is not None: + self.fut.set_exception(AMQPError(payload)) + self.fut = None + else: + # something that had no Future failed. Is it in declared? + if self.obj in self.declarer.declared: + self.declarer.declared.remove(self.obj) #todo access not threadsafe + self.declarer.on_discard(self.obj) + else: + if self.fut is not None: + self.fut.set_result() + self.fut = None + self.declarer.on_operation_done() + + +class Declarer(Channeler, Synchronized): + """ + Doing other things, such as declaring, deleting and other stuff. + + This also maintains a list of declared queues/exchanges, and redeclares them on each reconnect. + """ + def __init__(self): + """ + Create a new declarer. + """ + Channeler.__init__(self) + Synchronized.__init__(self) + + self.declared = set() # since Queues and Exchanges are hashable... + # anonymous queues aren't, but we reject those + # persistent + + self.left_to_declare = collections.deque() # since last disconnect. persistent+transient + # deque of Operation objects + + self.on_discard = Callable() # callable/1, with discarded elements + + self.in_process = None # Operation instance that is being progressed right now + + def on_close(self, payload=None): + + # we are interested in ChannelClose during order execution, + # because that means that operation was illegal, and must + # be discarded/exceptioned on future + + if payload is None: + + if self.in_process is not None: + self.in_process.on_connection_dead() + self.in_process = None + + # connection down, panic mode engaged. + while len(self.left_to_declare) > 0: + self.left_to_declare.pop().on_connection_dead() + + # recast current declarations as new operations + for dec in self.declared: + self.left_to_declare.append(Operation(self, dec)) + + super(Declarer, self).on_close() + return + + elif isinstance(payload, ChannelClose): + # Looks like a soft fail - we may try to survive that + old_con = self.connection + super(Declarer, self).on_close() + + # But, we are super optimists. If we are not cancelled, and connection is ok, + # we must reestablish + if old_con.state == ST_ONLINE and not self.cancelled: + self.attach(old_con) + else: + super(Declarer, self).on_close(payload) + + def on_operation_done(self): + """ + Called by operation, when it's complete (whether success or fail). + Not called when operation fails due to DC + """ + self.in_process = None + self._do_operations() + + def declare(self, obj, persistent=False): + """ + Schedule to have an object declared. + + Future is returned, so that user knows when it happens. + + Exchange declarations never fail. + Of course they do, but you will be told that it succeeded. This is by design, + and due to how AMQP works. + + Queue declarations CAN fail. + + Note that if re-declaring these fails, they will be silently discarded. + You can subscribe an on_discard(Exchange | Queue) here. + + :param obj: Exchange or Queue instance + :param persistent: will be redeclared upon disconnect. To remove, use "undeclare" + :return: a Future instance + :raise ValueError: tried to declare anonymous queue + """ + if isinstance(obj, Queue): + if obj.anonymous: + raise ValueError('Cannot declare anonymous queue') + + fut = Future() + + if persistent: + if obj not in self.declared: + self.declared.add(obj) #todo access not threadsafe + + self.left_to_declare.append(Operation(self, obj, fut)) + self._do_operations() + + return fut + + @Synchronized.synchronized + def _do_operations(self): + """ + Attempt to execute something. + + To be called when it's possible that something can be done + """ + if (self.state != ST_ONLINE) or len(self.left_to_declare) == 0 or (self.in_process is not None): + return + + self.in_process = self.left_to_declare.popleft() + self.in_process.perform() + + def on_setup(self, payload): + if isinstance(payload, ChannelOpenOk): + assert self.in_process is None + self.state = ST_ONLINE + self._do_operations() diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index d8a38d9a3b830dcc9c6af51661b69cb5db0b40e0..32458a3dc5cfee988707459ade8dedde9af33cd2 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -38,6 +38,8 @@ logger = logging.getLogger(__name__) CnpubMessageSendOrder = collections.namedtuple('CnpubMessageSendOrder', ('message', 'exchange_name', 'routing_key', 'future')) +#todo what if publisher in MODE_CNPUB fails mid message? they dont seem to be recovered + class Publisher(Channeler, Synchronized): """ diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py index 74d0aaa9a06e3a19ec8885dc3a92847fc334ece7..1940f92bd80e05a066944d9bf900f438a3125b13 100644 --- a/coolamqp/attaches/utils.py +++ b/coolamqp/attaches/utils.py @@ -219,6 +219,9 @@ class Synchronized(object): def __init__(self): self._monitor_lock = threading.Lock() + def get_monitor_lock(self): + return self._monitor_lock + @staticmethod def synchronized(fun): @functools.wraps(fun) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 686c23b771ef59e0d1aff949ca28a1dfdf9cf5c7..e44608a639e6e9a0f5c25dbf2bde33aa2eead585 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -9,7 +9,7 @@ import warnings import time from coolamqp.uplink import ListenerThread from coolamqp.clustering.single import SingleNodeReconnector -from coolamqp.attaches import Publisher, AttacheGroup, Consumer +from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.objects import Future, Exchange @@ -27,6 +27,8 @@ class Cluster(object): This has ListenerThread. Call .start() to connect to AMQP. + + It is not safe to fork() after .start() is called, but it's OK before. """ # Events you can be informed about @@ -46,22 +48,17 @@ class Cluster(object): if len(nodes) > 1: raise NotImplementedError(u'Multiple nodes not supported yet') - self.listener = ListenerThread() self.node, = nodes - self.attache_group = AttacheGroup() - - self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* - - self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) - self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) - - # Spawn a transactional publisher and a noack publisher - self.pub_tr = Publisher(Publisher.MODE_CNPUB) - self.pub_na = Publisher(Publisher.MODE_NOACK) - self.attache_group.add(self.pub_tr) - self.attache_group.add(self.pub_na) + def declare(self, obj, persistent=False): + """ + Declare a Queue/Exchange + :param obj: Queue/Exchange object + :param persistent: should it be redefined upon reconnect? + :return: Future + """ + return self.decl.declare(obj, persistent=persistent) def drain(self, timeout): """ @@ -129,9 +126,33 @@ class Cluster(object): def start(self, wait=True): """ - Connect to broker. + Connect to broker. Initialize Cluster. + + Only after this call is Cluster usable. + It is not safe to fork after this. + :param wait: block until connection is ready """ + self.listener = ListenerThread() + + self.attache_group = AttacheGroup() + + self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* + + self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) + self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) + + # Spawn a transactional publisher and a noack publisher + self.pub_tr = Publisher(Publisher.MODE_CNPUB) + self.pub_na = Publisher(Publisher.MODE_NOACK) + self.decl = Declarer() + + self.attache_group.add(self.pub_tr) + self.attache_group.add(self.pub_na) + self.attache_group.add(self.decl) + + + self.listener.init() self.listener.start() self.snr.connect() diff --git a/coolamqp/exceptions.py b/coolamqp/exceptions.py index 4603aaeeb5f7bde3c20fdaf1c3eb3999d89d3f84..dec32c0dc7ba221925b43b3c31bced88f41b5c05 100644 --- a/coolamqp/exceptions.py +++ b/coolamqp/exceptions.py @@ -6,6 +6,13 @@ class CoolAMQPError(Exception): """Base class for CoolAMQP errors""" +class ConnectionDead(CoolAMQPError): + """ + Operation could be not completed because some other error + than a legit AMQPError occurred, such as exploding kitten + """ + + class AMQPError(CoolAMQPError): """ Base class for errors received from AMQP server diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 4cb5f978a981ad1a7467b282df799d4977d46038..49b990044f7224495cb0ff5a7cc9befa21f9ab8d 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -6,6 +6,7 @@ import threading import uuid import six import logging +import warnings import concurrent.futures from coolamqp.framing.definitions import BasicContentPropertyList as MessageProperties @@ -130,15 +131,20 @@ class Exchange(object): self.name = name if isinstance(type, six.text_type): type = type.encode('utf8') + warnings.warn(u'type should be a binary type') self.type = type # must be bytes self.durable = durable self.auto_delete = auto_delete + def __repr__(self): + return u'Exchange(%s, %s, %s, %s)' % (repr(self.name), repr(self.type), repr(self.durable), repr(self.auto_delete)) + def __hash__(self): return self.name.__hash__() def __eq__(self, other): - return self.name == other.name + return (self.name == other.name) and (type(self) == type(other)) + Exchange.direct = Exchange() @@ -174,7 +180,7 @@ class Queue(object): self.consumer_tag = name if name != '' else uuid.uuid4().hex # consumer tag to use in AMQP comms def __eq__(self, other): - return self.name == other.name + return (self.name == other.name) and (type(self) == type(other)) def __hash__(self): return hash(self.name) diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 7fac7c25b71f146ff0d0127767ea62bdab31d925..84e5be3c75c43b43b98faddc176ee6f52c262409 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -250,9 +250,10 @@ class Connection(object): ListenerThread's on_fail callback. """ try: + #todo why is that necessary? it doesnt pass travis CI if there's no this block self.listener_socket.oneshot(delay, callback) except AttributeError: - print(dir(self)) + pass #print(dir(self)) def unwatch_all(self, channel_id): """ diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 25711809529427d7b9695c918b12ea91dacc44b2..c3ea13fd0dd678c03a0db2aeb207e2fe2d9b4f48 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -15,14 +15,14 @@ CONSUMER_CANCEL_NOTIFY = b'consumer_cancel_notify' SUPPORTED_EXTENSIONS = [ PUBLISHER_CONFIRMS, - CONSUMER_CANCEL_NOTIFY + CONSUMER_CANCEL_NOTIFY # half assed support - we just .cancel the consumer, see #12 ] CLIENT_DATA = [ # because RabbitMQ is some kind of a fascist and does not allow # these fields to be of type short-string (b'product', (b'CoolAMQP', 'S')), - (b'version', (b'0.81', 'S')), + (b'version', (b'0.82', 'S')), (b'copyright', (b'Copyright (C) 2016-2017 DMS Serwis', 'S')), (b'information', (b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', 'S')), (b'capabilities', ([(capa, (True, 't')) for capa in SUPPORTED_EXTENSIONS], 'F')), diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index fa67e0bee10d96bb4ec46b778c974fc23ee64bd7..16e8926b1056b905cb40196b8289d83c0b1354df 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -86,9 +86,11 @@ class EpollListener(object): sock.on_read() if event & select.EPOLLOUT: + if sock.on_write(): # I'm done with sending for now - self.epoll.modify(sock.fileno(), RW) + assert len(sock.data_to_send) == 0 and len(sock.priority_queue) == 0 + self.epoll.modify(sock.fileno(), RO) except SocketFailed: self.epoll.unregister(fd) diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 9684243ab9ceeae7d4a35a20ed7f43f7219f055d..a24d0c101b8f9fa52dc9f654dbccda222e63c6af 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -105,7 +105,7 @@ class BaseSocket(object): :raises SocketFailed: on socket error :return: True if I'm done sending shit for now """ - if self.is_failed: return + if self.is_failed: return False while True: if len(self.data_to_send) == 0: diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index 6e5d2dce995a97545852fe6fd087ba381d5acc38..013bd23db0d3fbb168accd51bb7ccc1c574a7202 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -17,11 +17,14 @@ class ListenerThread(threading.Thread): threading.Thread.__init__(self, name='coolamqp/ListenerThread') self.daemon = True self.terminating = False - self.listener = EpollListener() def terminate(self): self.terminating = True + def init(self): + """Called before start. It is not safe to fork after this""" + self.listener = EpollListener() + def run(self): while not self.terminating: self.listener.wait(timeout=1) diff --git a/setup.py b/setup.py index fab0acbae496215fa269a04e7f576cd3235ede72..928ce75da08710fb093b0349f88e30d5d2b310f6 100644 --- a/setup.py +++ b/setup.py @@ -4,12 +4,12 @@ from setuptools import setup setup(name=u'CoolAMQP', - version='0.81', + version='0.82', description=u'The fastest AMQP client', author=u'DMS Serwis s.c.', author_email=u'piotrm@smok.co', url=u'https://github.com/smok-serwis/coolamqp', - download_url='https://github.com/smok-serwis/coolamqp/archive/v0.81.zip', + download_url='https://github.com/smok-serwis/coolamqp/archive/v0.82.zip', keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], packages=[ 'coolamqp', diff --git a/tests/run.py b/tests/run.py index 57fc7b1f5201b349ec4f9a1e3659ca6b077cecc2..4c1a8591e9b2f3021d8189ff2dd3d7e1fecc30e7 100644 --- a/tests/run.py +++ b/tests/run.py @@ -1,7 +1,8 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function import time, logging, threading -from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue +from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, Exchange +from coolamqp.exceptions import AMQPError from coolamqp.clustering import Cluster import time @@ -14,11 +15,16 @@ if __name__ == '__main__': amqp = Cluster([NODE]) amqp.start(wait=True) + a = Exchange(u'jolax', type='fanout', auto_delete=True) + bad = Exchange(u'jolax', type='direct', auto_delete=True) - c1 = amqp.consume(Queue(b'siema-eniu', exclusive=True), qos=(None, 20)) - c2 = amqp.consume(Queue(b'jo-malina', exclusive=True)) + amqp.declare(a).result() - while True: - time.sleep(30) + try: + amqp.declare(bad).result() + except AMQPError: + print(':)') + + time.sleep(30) amqp.shutdown(True) diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 67ebff937b88bea4b93acd7049a713a7a75af6fb..ad88e738986fc7159f021e0e48ededb241f3b004 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -5,8 +5,8 @@ Test things from __future__ import print_function, absolute_import, division import six import unittest -import time, logging, threading -from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage +import time, logging, threading, monotonic +from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage, Exchange from coolamqp.clustering import Cluster, MessageReceived, NothingMuch import time @@ -30,6 +30,13 @@ class TestA(unittest.TestCase): fut.result() con.cancel() + def test_actually_waits(self): + a = monotonic.monotonic() + + self.c.drain(5) + + self.assertTrue(monotonic.monotonic() - a >= 4) + def test_set_qos_but_later(self): con, fut = self.c.consume(Queue(u'hello', exclusive=True)) @@ -45,6 +52,13 @@ class TestA(unittest.TestCase): self.assertEquals(con.qos, (0, 110)) + def test_anonymq(self): + q = Queue(exchange=Exchange(u'ooo', type=b'fanout', auto_delete=True), auto_delete=True) + + c, f = self.c.consume(q) + + f.result() + def test_send_recv_zerolen(self): P = {'q': False} diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py index 55c58f8635be6af05a5ac564f3516e35e3be270d..a070d2bd94d82e6d7777cc3b39eaff916e063fc2 100644 --- a/tests/test_clustering/test_exchanges.py +++ b/tests/test_clustering/test_exchanges.py @@ -5,7 +5,7 @@ import unittest import time, logging, threading from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage, Exchange from coolamqp.clustering import Cluster, MessageReceived, NothingMuch - +from coolamqp.exceptions import AMQPError import time #todo handle bad auth @@ -21,6 +21,12 @@ class TestExchanges(unittest.TestCase): def tearDown(self): self.c.shutdown() + def test_declare_exchange(self): + a = Exchange(u'jolax', type=b'fanout', auto_delete=True) + bad = Exchange(u'jolax', type=b'topic', auto_delete=True) + + self.c.declare(a).result() + self.c.declare(bad).result() # succeeds nevertheless def test_fanout(self): x = Exchange(u'jola', type='direct', auto_delete=True)