diff --git a/coolamqp/attaches/__init__.py b/coolamqp/attaches/__init__.py index 9af36c4ba750393136ccd92ff4feb6df1cb22a9b..5075d303b83e0c24bb739eb6716c7a8c914b0304 100644 --- a/coolamqp/attaches/__init__.py +++ b/coolamqp/attaches/__init__.py @@ -8,6 +8,8 @@ The attache becomes then responsible for closing this channel. Attache should also register at least one on_fail watch, so it can handle things if they go south. Multiple attaches can be "abstracted" as single one via AttacheGroup (which is also an Attache) + +EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. """ from coolamqp.attaches.consumer import Consumer diff --git a/coolamqp/attaches/agroup.py b/coolamqp/attaches/agroup.py index 28c498329614a41cccfa462620234d8727dfa121..05a43b95db422440c2fb5c19fff033df7a9370bc 100644 --- a/coolamqp/attaches/agroup.py +++ b/coolamqp/attaches/agroup.py @@ -35,12 +35,10 @@ class AttacheGroup(Attache): :param attache: Attache instance """ assert attache not in self.attaches - print('Adding %s' % (attache, )) self.attaches.append(attache) # If we have any connection, and it's not dead, attach if self.connection is not None and self.connection.state != ST_OFFLINE: - print('Attach to me %s' % (attache, )) attache.attach(self.connection) if isinstance(attache, Consumer): @@ -67,7 +65,6 @@ class AttacheGroup(Attache): for attache in self.attaches: if not attache.cancelled: if attache.connection != connection: - print('Attach to me %s' % (attache, )) attache.attach(connection) diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 4c298e7e0cddca06b1c08b3a612dc97b3610e57a..405ab75eb589c25767fce2ff318f6ba0b7d70947 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -118,16 +118,16 @@ class Channeler(Attache): if payload is None: # Connection went down HARD - self.connection.free_channels.put(self.channel_id) + self.connection.free_channels.append(self.channel_id) self.channel_id = None elif isinstance(payload, ChannelClose): # We have failed print('Channel close: RC=%s RT=%s', payload.reply_code, payload.reply_text) - self.connection.free_channels.put(self.channel_id) + self.connection.free_channels.append(self.channel_id) self.channel_id = None elif isinstance(payload, ChannelCloseOk): - self.connection.free_channels.put(self.channel_id) + self.connection.free_channels.append(self.channel_id) self.channel_id = None else: raise Exception('Unrecognized payload - did you forget to handle something? :D') diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 5f08120e24c26d9507b67aed242680db4be34a64..10e93c9d4a60319e441fdffa381237a805d46648 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -24,7 +24,7 @@ except ImportError: pass from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE -from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch +from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch, FailWatch from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable from coolamqp.objects import Future @@ -78,6 +78,17 @@ class Publisher(Channeler): self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB + def attach(self, connection): + super(Publisher, self).attach(connection) + connection.watch(FailWatch(self.on_fail)) + + def on_fail(self): + """ + Registered as a fail watch for connection + """ + self.state = ST_OFFLINE + self.connection = None + def _pub(self, message, exchange_name, routing_key): """ Just send the message. Sends BasicDeliver + header + body diff --git a/coolamqp/persistence/__init__.py b/coolamqp/persistence/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c26370fd050633248a085b288d3ce07f44823179 --- /dev/null +++ b/coolamqp/persistence/__init__.py @@ -0,0 +1,43 @@ +# coding=UTF-8 +""" +This is the layer that makes your consumers and publishers survive connection losses. +It also renegotiates connections, shall they fail, and implements some sort of exponential delay policy. + +EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. + +""" +from __future__ import print_function, absolute_import, division +import six +import logging + +from coolamqp.uplink import FailWatch, Connection + +logger = logging.getLogger(__name__) + + + +class SingleNodeReconnector(object): + """ + This has a Listener Thread, a Node Definition, and an attache group, + and tries to keep all the things relatively alive. + """ + + def __init__(self, node_def, attache_group, listener_thread): + self.listener_thread = listener_thread + self.node_def = node_def + self.attache_group = attache_group + self.connection = None + + def connect(self): + assert self.connection is None + + # Initiate connecting + self.connection = Connection(self.node_def, self.listener_thread) + self.connection.start() + self.connection.watch(FailWatch(self.on_fail)) + self.attache_group.attach(self.connection) + + def on_fail(self): + logger.info('Reconnecting...') + self.connection = None + self.connect() diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index 428f48aa4c4b6e9ca9e697a815066894544bfd58..cef792f6a97f9813867f7ca9741d5f1783ee03bd 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -8,9 +8,11 @@ Core object here is Connection. This package: You can wait for a particular frame by setting watches on connections. Watches will fire upon an event triggering them. +EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. + """ from __future__ import absolute_import, division, print_function -from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, MethodWatch, AnyWatch +from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, MethodWatch, AnyWatch, FailWatch from coolamqp.uplink.listener import ListenerThread from coolamqp.uplink.handshake import PUBLISHER_CONFIRMS, CONSUMER_CANCEL_NOTIFY diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index ef61487823e00ff610d81fb8c5caad220061e7f0..1160fe36366d52ff088051eb9d45f805e94a706f 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -72,6 +72,9 @@ class FailWatch(Watch): Watch.__init__(self, None, True) self.callable = callable + def is_triggered_by(self, frame): + return False + def fire(self): """ Connection failed! diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 3177fc9b88e13a39905c6ac97651d73bd055fe20..6e2ccf3a011d6b5a84fba6c347e6f9137a744d16 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -4,6 +4,7 @@ import six import logging import select import monotonic +import socket import collections import heapq diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 3f95d797f0cd99d81c4145726a1324e2b0e4f6e3..5a1fa4575a46e614737a1a559acb5bb0c9d1b97d 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -37,8 +37,13 @@ class BaseSocket(object): self.data_to_send = collections.deque() self.priority_queue = collections.deque() # when a piece of data is finished, this queue is checked first self.my_on_read = on_read - self.on_fail = on_fail + self._on_fail = on_fail self.on_time = on_time + self.is_failed = False + + def on_fail(self): + self.is_failed = True + self._on_fail() def send(self, data, priority=True): """ @@ -48,6 +53,7 @@ class BaseSocket(object): Note that data will be sent atomically, ie. without interruptions. :param priority: preempt other datas. Property of sending data atomically will be maintained. """ + if self.is_failed: return if priority: self.priority_queue.append(data) else: @@ -71,6 +77,7 @@ class BaseSocket(object): def on_read(self): """Socket is readable, called by Listener""" + if self.is_failed: return try: data = self.sock.recv(2048) except (IOError, socket.error): @@ -90,6 +97,7 @@ class BaseSocket(object): :raises SocketFailed: on socket error :return: True if I'm done sending shit for now """ + if self.is_failed: return while True: if len(self.data_to_send) == 0: diff --git a/tests/run.py b/tests/run.py index 9ed4a25c513d6ca085e764a7c3a5bf29426408f6..994b4b390292ac4078dcb6697e6be4f3e64df07e 100644 --- a/tests/run.py +++ b/tests/run.py @@ -7,6 +7,7 @@ from coolamqp.uplink import Connection from coolamqp.attaches import Consumer, Publisher, AttacheGroup from coolamqp.objects import Queue +from coolamqp.persistence import SingleNodeReconnector import time @@ -17,14 +18,12 @@ if __name__ == '__main__': lt = ListenerThread() lt.start() - con = Connection(NODE, lt) - con.start() - ag = AttacheGroup() + snr = SingleNodeReconnector(NODE, ag, lt) + snr.connect() ag.add(Consumer(Queue('siema-eniu'), no_ack=True)) - class IPublishThread(threading.Thread): def __init__(self, ag): super(IPublishThread, self).__init__() @@ -38,8 +37,6 @@ if __name__ == '__main__': pub2.publish(Message(b'you dawg', properties=MessageProperties(content_type='text/plain')), routing_key=b'siema-eniu') - ag.attach(con) - IPublishThread(ag).start() while True: