From 9f0cd03c0a5f949b6e39a0ba32e2fa516f9f6c02 Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Mon, 9 Jan 2017 23:18:38 +0100 Subject: [PATCH] idk --- coolamqp/attaches/__init__.py | 2 + coolamqp/attaches/agroup.py | 3 -- coolamqp/attaches/channeler.py | 6 +-- coolamqp/attaches/publisher.py | 13 ++++++- coolamqp/persistence/__init__.py | 43 ++++++++++++++++++++++ coolamqp/uplink/__init__.py | 4 +- coolamqp/uplink/connection/watches.py | 3 ++ coolamqp/uplink/listener/epoll_listener.py | 1 + coolamqp/uplink/listener/socket.py | 10 ++++- tests/run.py | 9 ++--- 10 files changed, 79 insertions(+), 15 deletions(-) create mode 100644 coolamqp/persistence/__init__.py diff --git a/coolamqp/attaches/__init__.py b/coolamqp/attaches/__init__.py index 9af36c4..5075d30 100644 --- a/coolamqp/attaches/__init__.py +++ b/coolamqp/attaches/__init__.py @@ -8,6 +8,8 @@ The attache becomes then responsible for closing this channel. Attache should also register at least one on_fail watch, so it can handle things if they go south. Multiple attaches can be "abstracted" as single one via AttacheGroup (which is also an Attache) + +EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. """ from coolamqp.attaches.consumer import Consumer diff --git a/coolamqp/attaches/agroup.py b/coolamqp/attaches/agroup.py index 28c4983..05a43b9 100644 --- a/coolamqp/attaches/agroup.py +++ b/coolamqp/attaches/agroup.py @@ -35,12 +35,10 @@ class AttacheGroup(Attache): :param attache: Attache instance """ assert attache not in self.attaches - print('Adding %s' % (attache, )) self.attaches.append(attache) # If we have any connection, and it's not dead, attach if self.connection is not None and self.connection.state != ST_OFFLINE: - print('Attach to me %s' % (attache, )) attache.attach(self.connection) if isinstance(attache, Consumer): @@ -67,7 +65,6 @@ class AttacheGroup(Attache): for attache in self.attaches: if not attache.cancelled: if attache.connection != connection: - print('Attach to me %s' % (attache, )) attache.attach(connection) diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 4c298e7..405ab75 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -118,16 +118,16 @@ class Channeler(Attache): if payload is None: # Connection went down HARD - self.connection.free_channels.put(self.channel_id) + self.connection.free_channels.append(self.channel_id) self.channel_id = None elif isinstance(payload, ChannelClose): # We have failed print('Channel close: RC=%s RT=%s', payload.reply_code, payload.reply_text) - self.connection.free_channels.put(self.channel_id) + self.connection.free_channels.append(self.channel_id) self.channel_id = None elif isinstance(payload, ChannelCloseOk): - self.connection.free_channels.put(self.channel_id) + self.connection.free_channels.append(self.channel_id) self.channel_id = None else: raise Exception('Unrecognized payload - did you forget to handle something? :D') diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 5f08120..10e93c9 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -24,7 +24,7 @@ except ImportError: pass from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE -from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch +from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch, FailWatch from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable from coolamqp.objects import Future @@ -78,6 +78,17 @@ class Publisher(Channeler): self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB + def attach(self, connection): + super(Publisher, self).attach(connection) + connection.watch(FailWatch(self.on_fail)) + + def on_fail(self): + """ + Registered as a fail watch for connection + """ + self.state = ST_OFFLINE + self.connection = None + def _pub(self, message, exchange_name, routing_key): """ Just send the message. Sends BasicDeliver + header + body diff --git a/coolamqp/persistence/__init__.py b/coolamqp/persistence/__init__.py new file mode 100644 index 0000000..c26370f --- /dev/null +++ b/coolamqp/persistence/__init__.py @@ -0,0 +1,43 @@ +# coding=UTF-8 +""" +This is the layer that makes your consumers and publishers survive connection losses. +It also renegotiates connections, shall they fail, and implements some sort of exponential delay policy. + +EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. + +""" +from __future__ import print_function, absolute_import, division +import six +import logging + +from coolamqp.uplink import FailWatch, Connection + +logger = logging.getLogger(__name__) + + + +class SingleNodeReconnector(object): + """ + This has a Listener Thread, a Node Definition, and an attache group, + and tries to keep all the things relatively alive. + """ + + def __init__(self, node_def, attache_group, listener_thread): + self.listener_thread = listener_thread + self.node_def = node_def + self.attache_group = attache_group + self.connection = None + + def connect(self): + assert self.connection is None + + # Initiate connecting + self.connection = Connection(self.node_def, self.listener_thread) + self.connection.start() + self.connection.watch(FailWatch(self.on_fail)) + self.attache_group.attach(self.connection) + + def on_fail(self): + logger.info('Reconnecting...') + self.connection = None + self.connect() diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index 428f48a..cef792f 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -8,9 +8,11 @@ Core object here is Connection. This package: You can wait for a particular frame by setting watches on connections. Watches will fire upon an event triggering them. +EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. + """ from __future__ import absolute_import, division, print_function -from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, MethodWatch, AnyWatch +from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, MethodWatch, AnyWatch, FailWatch from coolamqp.uplink.listener import ListenerThread from coolamqp.uplink.handshake import PUBLISHER_CONFIRMS, CONSUMER_CANCEL_NOTIFY diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index ef61487..1160fe3 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -72,6 +72,9 @@ class FailWatch(Watch): Watch.__init__(self, None, True) self.callable = callable + def is_triggered_by(self, frame): + return False + def fire(self): """ Connection failed! diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 3177fc9..6e2ccf3 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -4,6 +4,7 @@ import six import logging import select import monotonic +import socket import collections import heapq diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 3f95d79..5a1fa45 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -37,8 +37,13 @@ class BaseSocket(object): self.data_to_send = collections.deque() self.priority_queue = collections.deque() # when a piece of data is finished, this queue is checked first self.my_on_read = on_read - self.on_fail = on_fail + self._on_fail = on_fail self.on_time = on_time + self.is_failed = False + + def on_fail(self): + self.is_failed = True + self._on_fail() def send(self, data, priority=True): """ @@ -48,6 +53,7 @@ class BaseSocket(object): Note that data will be sent atomically, ie. without interruptions. :param priority: preempt other datas. Property of sending data atomically will be maintained. """ + if self.is_failed: return if priority: self.priority_queue.append(data) else: @@ -71,6 +77,7 @@ class BaseSocket(object): def on_read(self): """Socket is readable, called by Listener""" + if self.is_failed: return try: data = self.sock.recv(2048) except (IOError, socket.error): @@ -90,6 +97,7 @@ class BaseSocket(object): :raises SocketFailed: on socket error :return: True if I'm done sending shit for now """ + if self.is_failed: return while True: if len(self.data_to_send) == 0: diff --git a/tests/run.py b/tests/run.py index 9ed4a25..994b4b3 100644 --- a/tests/run.py +++ b/tests/run.py @@ -7,6 +7,7 @@ from coolamqp.uplink import Connection from coolamqp.attaches import Consumer, Publisher, AttacheGroup from coolamqp.objects import Queue +from coolamqp.persistence import SingleNodeReconnector import time @@ -17,14 +18,12 @@ if __name__ == '__main__': lt = ListenerThread() lt.start() - con = Connection(NODE, lt) - con.start() - ag = AttacheGroup() + snr = SingleNodeReconnector(NODE, ag, lt) + snr.connect() ag.add(Consumer(Queue('siema-eniu'), no_ack=True)) - class IPublishThread(threading.Thread): def __init__(self, ag): super(IPublishThread, self).__init__() @@ -38,8 +37,6 @@ if __name__ == '__main__': pub2.publish(Message(b'you dawg', properties=MessageProperties(content_type='text/plain')), routing_key=b'siema-eniu') - ag.attach(con) - IPublishThread(ag).start() while True: -- GitLab