From bb68f9ed1f53aabe49f9bd31b24632cd6be4b3df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Mon, 26 Dec 2016 03:25:50 +0100 Subject: [PATCH] further fixes --- LICENSE | 3 + coolamqp/backends/base.py | 2 +- coolamqp/backends/pyamqp.py | 15 ++++ coolamqp/cluster.py | 2 + coolamqp/handler.py | 106 +++++++++++++++++++--------- coolamqp/messages.py | 13 +++- coolamqp/orders.py | 15 ++-- setup.py | 6 +- tests/test_failures.py | 36 +++++----- tests/test_noack.py | 2 +- tests/utils.py | 136 ++++++++++++++++++++++++------------ 11 files changed, 230 insertions(+), 106 deletions(-) diff --git a/LICENSE b/LICENSE index 2aee442..9d8f6d3 100644 --- a/LICENSE +++ b/LICENSE @@ -19,3 +19,6 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +resources/amqp-0-9-1.xml: Copyright (c) 2016 OASIS. All rights reserved. \ No newline at end of file diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index 449ff89..ad1b34f 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -140,6 +140,6 @@ class AMQPBackend(object): def shutdown(self): """ Close this connection. - This is not allowed to return anything. + This is not allowed to return anything or raise """ self.cluster_handler_thread = None # break GC cycles diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 826e2b3..ce53194 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -23,9 +23,22 @@ def translate_exceptions(fun): def q(*args, **kwargs): try: return fun(*args, **kwargs) + except (amqp.exceptions.ConsumerCancelled): + # I did not expect those here. Channel must be really bad. + raise ConnectionFailedError('WTF: '+(e.message if six.PY2 else e.args[0])) + except (amqp.exceptions.Blocked, ) as e: + pass # too bad except (amqp.RecoverableChannelError, amqp.exceptions.NotFound, amqp.exceptions.AccessRefused) as e: + + try: + e.reply_code + except AttributeError: + + + if e. + raise RemoteAMQPError(e.reply_code, e.reply_text) except (IOError, amqp.ConnectionForced, @@ -56,6 +69,7 @@ class PyAMQPBackend(AMQPBackend): def shutdown(self): AMQPBackend.shutdown(self) + print 'BACKEND SHUTDOWN START' try: self.channel.close() except: @@ -64,6 +78,7 @@ class PyAMQPBackend(AMQPBackend): self.connection.close() except: pass + print 'BACKEND SHUTDOWN COMPLETE' @translate_exceptions def process(self, max_time=1): diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 1618650..21bbb37 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -94,6 +94,8 @@ class Cluster(object): Will be discarded upon fail. :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + or Cancelled instance if user cancelled this order + or Discarded instance if message discarded due to 'discard_on_fail' :return: a Future with this order's status """ a = SendMessage(message, exchange or Exchange.direct, routing_key, diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 3645ba3..1c32fde 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -46,48 +46,67 @@ class ClusterHandlerThread(threading.Thread): self.qos = None # or tuple (prefetch_size, prefetch_count) if QoS set - def _reconnect(self): - exponential_backoff_delay = 1 + def _reconnect_attempt(self): + """Single attempt to regain connectivity. May raise ConnectionFailedError""" + self.backend = None + if self.backend is not None: + self.backend.shutdown() + self.backend = None - while not self.cluster.connected: - if self.backend is not None: - self.backend.shutdown() - self.backend = None + self.connect_id += 1 + node = six.next(self.cluster.node_to_connect_to) + logger.info('Connecting to %s', node) - self.connect_id += 1 - node = six.next(self.cluster.node_to_connect_to) - logger.info('Connecting to %s', node) + self.backend = self.cluster.backend(node, self) - try: - self.backend = self.cluster.backend(node, self) - - if self.qos is not None: - pre_siz, pre_cou, glob = self.qos - self.backend.basic_qos(pre_siz, pre_cou, glob) - - for exchange in self.declared_exchanges.values(): - self.backend.exchange_declare(exchange) - - failed_queues = [] - for queue, no_ack in self.queues_by_consumer_tags.values(): - try: - self.backend.queue_declare(queue) - if queue.exchange is not None: - self.backend.queue_bind(queue, queue.exchange) - self.backend.basic_consume(queue, no_ack=no_ack) - except RemoteAMQPError as e: - if e.code in (403, 405): # access refused, resource locked + if self.qos is not None: + pre_siz, pre_cou, glob = self.qos + self.backend.basic_qos(pre_siz, pre_cou, glob) + + for exchange in self.declared_exchanges.values(): + self.backend.exchange_declare(exchange) + + failed_queues = [] + for queue, no_ack in self.queues_by_consumer_tags.values(): + while True: + try: + self.backend.queue_declare(queue) + if queue.exchange is not None: + self.backend.queue_bind(queue, queue.exchange) + self.backend.basic_consume(queue, no_ack=no_ack) + logger.info('Consuming from %s no_ack=%s', queue, no_ack) + except RemoteAMQPError as e: + if e.code in (403, 405): # access refused, resource locked + # Ok, queue, what should we do? + if queue.locked_after_reconnect == 'retry': + time.sleep(0.1) + continue # retry until works + elif queue.locked_after_reconnect == 'cancel': self.event_queue.put(ConsumerCancelled(queue, ConsumerCancelled.REFUSED_ON_RECONNECT)) failed_queues.append(queue) + elif queue.locked_after_reconnect == 'defer': + self.order_queue.append(ConsumeQueue(queue, no_ack=no_ack)) + failed_queues.append(queue) + else: + raise Exception('wtf') else: - raise + raise # idk + break - for failed_queue in failed_queues: - del self.queues_by_consumer_tags[failed_queue.consumer_tag] + for failed_queue in failed_queues: + del self.queues_by_consumer_tags[failed_queue.consumer_tag] + def _reconnect(self): + """Regain connectivity to cluster. May block for a very long time, + as it will not """ + exponential_backoff_delay = 1 + + while not self.cluster.connected: + try: + self._reconnect_attempt() except ConnectionFailedError as e: # a connection failure happened :( - logger.warning('Connecting to %s failed due to %s', node, repr(e)) + logger.warning('Connecting failed due to %s while connecting and initial setup', repr(e)) self.cluster.connected = False if self.backend is not None: self.backend.shutdown() @@ -99,6 +118,7 @@ class ClusterHandlerThread(threading.Thread): exponential_backoff_delay = min(60, exponential_backoff_delay * 2) else: + logger.info('Connected to AMQP broker via %s', self.backend) self.cluster.connected = True self.event_queue.put(ConnectionUp(initial=self.first_connect)) self.first_connect = False @@ -159,7 +179,8 @@ class ClusterHandlerThread(threading.Thread): except RemoteAMQPError as e: logger.error('Remote AMQP error: %s', e) order._failed(e) # we are allowed to go on - except ConnectionFailedError: + except ConnectionFailedError as e: + logger.error('Connection failed while %s: %s', order, e) self.order_queue.appendleft(order) raise else: @@ -178,7 +199,24 @@ class ClusterHandlerThread(threading.Thread): logger.warning('Connection to broker lost: %s', e) self.cluster.connected = False self.event_queue.put(ConnectionDown()) - self._reconnect() + + # =========================== remove SendMessagees with discard_on_fail + my_orders = [] # because order_queue is used by many threads + while len(self.order_queue) > 0: + order = self.order_queue.popleft() + if isinstance(order, SendMessage): + if order.message.discard_on_fail: + order._discard() + continue + + my_orders.append(order) + + # Ok, we have them in order of execution. Append-left in reverse order + # to preserve previous order + for order in reversed(my_orders): + my_orders.appendleft(order) + + self._reconnect() def run(self): try: diff --git a/coolamqp/messages.py b/coolamqp/messages.py index ee76e35..2a2dde2 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -95,9 +95,12 @@ Exchange.direct = Exchange() class Queue(object): """ This object represents a Queue that applications consume from or publish to. + + Caveat: Please note the locked_after_reconnect option in constructor """ - def __init__(self, name='', durable=False, exchange=None, exclusive=False, auto_delete=False): + def __init__(self, name='', durable=False, exchange=None, exclusive=False, auto_delete=False, + locked_after_reconnect='retry'): """ Create a queue definition. @@ -110,6 +113,12 @@ class Queue(object): :param exchange: Exchange for this queue to bind to. None for no binding. :param exclusive: Is this queue exclusive? :param auto_delete: Is this queue auto_delete ? + :param locked_after_reconnect: Behaviour when queue is exclusive and ACCESS_REFUSED/RESOURCE_LOCKED + is seen on reconnect. Because broker might not know that we have failed, 'retry' will + try again until succeeds (default option). This might block for a long time, until the broker + realizes previous connection is dead and deletes the queue. + 'cancel' will return a ConsumerCancelled to client + 'defer' will attempt to configure the queue later, but will not block other tasks from progressing. """ self.name = name # if name is '', this will be filled in with broker-generated name upon declaration @@ -121,3 +130,5 @@ class Queue(object): self.anonymous = name == '' # if this queue is anonymous, it must be regenerated upon reconnect self.consumer_tag = name if name != '' else uuid.uuid4().hex # consumer tag to use in AMQP comms + self.locked_after_reconnect = locked_after_reconnect + assert locked_after_reconnect in ('retry', 'cancel', 'defer') \ No newline at end of file diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 181f77d..2c97858 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -27,8 +27,11 @@ class Order(object): If this fails, then property .error_code can be read to get the error code. and .reply_text has the reply of the server or some other reason. These are set before callbacks are called. + Error code is None, if not available, or AMQP constants describing errors, eg. 502 for syntax error. + + A discarded or cancelled order is considered FAILED """ self.on_completed = on_completed or _NOOP_COMP self.on_failed = on_failed or _NOOP_FAIL @@ -56,6 +59,12 @@ class Order(object): self.on_completed() self.lock.release() + def _discard(self): # called by handler + from coolamqp.backends.base import Discarded + self.discarded = True + self.on_failed(Discarded()) + self.lock.release() + def _failed(self, e): # called by handler """ :param e: AMQPError instance or Cancelled instance @@ -76,10 +85,9 @@ class Order(object): def has_failed(self): """Return whether the operation failed, ie. completed but with an error code. - User-cancelled operations are not failed. + Cancelled and discarded ops are considered failed. This assumes that this order has been .wait()ed upon""" - assert self._result is not None - return not (self.cancelled or self._result is True) + return self._result is True def result(self): """Wait until this is completed and return a response""" @@ -136,7 +144,6 @@ class ConsumeQueue(_Queue): self.no_ack = no_ack - class DeleteQueue(_Queue): """Delete a queue""" diff --git a/setup.py b/setup.py index 699839a..fe3b9ae 100644 --- a/setup.py +++ b/setup.py @@ -3,12 +3,12 @@ from setuptools import setup setup(name='CoolAMQP', - version='0.11', + version='0.12', description='AMQP client with sane reconnects', author='DMS Serwis s.c.', author_email='piotrm@smok.co', url='https://github.com/smok-serwis/coolamqp', - download_url='https://github.com/smok-serwis/coolamqp/archive/v0.11.zip', + download_url='https://github.com/smok-serwis/coolamqp/archive/v0.12.zip', keywords=['amqp', 'pyamqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], packages=[ 'coolamqp', @@ -33,3 +33,5 @@ setup(name='CoolAMQP', 'Topic :: Software Development :: Libraries' ] ) + + diff --git a/tests/test_failures.py b/tests/test_failures.py index 029ec2d..c658026 100644 --- a/tests/test_failures.py +++ b/tests/test_failures.py @@ -2,7 +2,6 @@ from __future__ import absolute_import, division, print_function import unittest -import os import time from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, \ ConnectionDown, ConsumerCancelled, Message, Exchange @@ -13,13 +12,15 @@ NODE = ClusterNode('127.0.0.1', 'guest', 'guest') from tests.utils import CoolAMQPTestCase -class TestSpecialCases(unittest.TestCase): +class TestSpecialCases(CoolAMQPTestCase): + INIT_AMQP = False + def test_termination_while_disconnect(self): self.amqp = Cluster([NODE]) self.amqp.start() self.assertIsInstance(self.amqp.drain(wait=1), ConnectionUp) - os.system("sudo service rabbitmq-server stop") + self.fail_amqp() time.sleep(5) self.assertIsInstance(self.amqp.drain(wait=1), ConnectionDown) @@ -27,7 +28,7 @@ class TestSpecialCases(unittest.TestCase): self.assertIsNone(self.amqp.thread.backend) self.assertFalse(self.amqp.connected) - os.system("sudo service rabbitmq-server start") + self.unfail_amqp() class TestFailures(CoolAMQPTestCase): @@ -36,10 +37,11 @@ class TestFailures(CoolAMQPTestCase): self.amqp.cancel(Queue('hello world')).result() def test_longer_disconnects(self): - os.system("sudo service rabbitmq-server stop") + self.fail_amqp() + time.sleep(3) self.drainTo(ConnectionDown, 4) time.sleep(12) - os.system("sudo service rabbitmq-server start") + self.unfail_amqp() self.drainTo(ConnectionUp, 35) def test_qos_redeclared_on_fail(self): @@ -58,11 +60,11 @@ class TestFailures(CoolAMQPTestCase): self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) def test_connection_flags_are_okay(self): - os.system("sudo service rabbitmq-server stop") - self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) + self.fail_amqp() + self.drainTo(ConnectionDown, 8) self.assertFalse(self.amqp.connected) - os.system("sudo service rabbitmq-server start") - self.assertIsInstance(self.amqp.drain(wait=20), ConnectionUp) + self.unfail_amqp() + self.drainTo(ConnectionUp, 5) self.assertTrue(self.amqp.connected) def test_sending_a_message_is_cancelled(self): @@ -70,7 +72,7 @@ class TestFailures(CoolAMQPTestCase): self.amqp.consume(Queue('wtf1', exclusive=True)) - os.system("sudo service rabbitmq-server stop") + self.fail_amqp() self.drainTo(ConnectionDown, 5) p = self.amqp.send(Message(b'what the fuck'), routing_key='wtf1') @@ -78,7 +80,7 @@ class TestFailures(CoolAMQPTestCase): self.assertTrue(p.wait()) self.assertFalse(p.has_failed()) - os.system("sudo service rabbitmq-server start") + self.fail_unamqp() self.drainToAny([ConnectionUp], 30, forbidden=[MessageReceived]) def test_qos_after_failure(self): @@ -88,12 +90,11 @@ class TestFailures(CoolAMQPTestCase): self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - p = self.amqp.drain(wait=4) - self.assertIsInstance(p, MessageReceived) + p = self.drainTo(MessageReceived, 4) self.assertIsNone(self.amqp.drain(wait=5)) p.message.ack() - self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) + self.drainTo(MessageReceived, 4) self.restart_rmq() @@ -108,15 +109,14 @@ class TestFailures(CoolAMQPTestCase): def test_connection_down_and_up_redeclare_queues(self): """are messages generated at all? does it reconnect?""" - - q1 = Queue('wtf1', exclusive=True) + q1 = Queue('wtf1', exclusive=True, auto_delete=True) self.amqp.consume(q1).result() self.restart_rmq() self.amqp.send(Message(b'what the fuck'), routing_key='wtf1') - self.drainTo(MessageReceived, 20) + self.drainTo(MessageReceived, 10) def test_exchanges_are_redeclared(self): xchg = Exchange('a_fanout', type='fanout') diff --git a/tests/test_noack.py b/tests/test_noack.py index 4d0a366..2720947 100644 --- a/tests/test_noack.py +++ b/tests/test_noack.py @@ -1,7 +1,7 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function import six -import os +import unittest import time from tests.utils import CoolAMQPTestCase diff --git a/tests/utils.py b/tests/utils.py index 49e8565..422c366 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,13 +1,13 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function import unittest -from threading import Lock import time +import socket import collections -import os import monotonic from coolamqp import Cluster, ClusterNode, ConnectionUp, ConnectionDown, ConnectionUp, ConsumerCancelled +from coolamqp.backends.base import AMQPBackend, ConnectionFailedError def getamqp(): @@ -23,47 +23,15 @@ class CoolAMQPTestCase(unittest.TestCase): """ INIT_AMQP = True # override on child classes - - def new_amqp_connection(self, consume_connectionup=True): - obj = self - - class CM(object): - """Context manager. Get new AMQP uplink. Consume ConnectionUp if consume_connectionup - - Use like: - - with self.new_amqp_connection() as amqp2: - amqp2.consume(...) - - """ - def __enter__(self): - self.amqp = getamqp() - if consume_connectionup: - obj.assertIsInstance(self.amqp.drain(3), ConnectionUp) - return self.amqp - - def __exit__(self, exc_type, exc_val, exc_tb): - self.amqp.shutdown() - return False - return CM() - def restart_rmq(self): - # forcibly reset the connection - class FailbowlSocket(object): - def __getattr__(self, name): - import socket - raise socket.error() - - self.amqp.thread.backend.channel.connection.transport.sock = FailbowlSocket() - - self.drainTo([ConnectionDown, ConnectionUp], [5, 10]) - def setUp(self): if self.INIT_AMQP: - os.system('sudo service rabbitmq-server start') # if someone killed it self.__newam = self.new_amqp_connection() self.amqp = self.__newam.__enter__() def tearDown(self): + # if you didn't unfail AMQP, that means you don't know what you doing + self.assertRaises(AttributeError, lambda: self.old_backend) + if self.INIT_AMQP: self.__newam.__exit__(None, None, None) @@ -116,14 +84,92 @@ class CoolAMQPTestCase(unittest.TestCase): :param max_time: in seconds """ - test = self + return TakesLessThanCM(self, max_time) + + # ======failures + def single_fail_amqp(self): # insert single failure + sock = self.amqp.thread.backend.channel.connection.transport.sock + self.amqp.thread.backend.channel.connection.transport.sock = FailbowlSocket() + self.amqp.thread.backend.channel.connection = None # 'connection already closed' or sth like that + + sock.close() + + def fail_amqp(self): # BROKER DEAD: SWITCH ON + + self.old_backend = self.amqp.backend + self.amqp.backend = FailbowlBackend + + def unfail_amqp(self): # BROKER DEAD: SWITCH OFF + self.amqp.backend = self.old_backend + del self.old_backend + + def restart_rmq(self): # simulate a broker restart + self.fail_amqp() + self.single_fail_amqp() + time.sleep(3) + self.unfail_amqp() + + self.drainTo([ConnectionDown, ConnectionUp], [5, 20]) + + def new_amqp_connection(self, consume_connectionup=True): + return AMQPConnectionCM(self, consume_connectionup=consume_connectionup) + + +class TakesLessThanCM(object): + def __init__(self, testCase, max_time): + self.test = testCase + self.max_time = max_time + + def __enter__(self, testCase, max_time): + self.started_at = time.time() + return lambda: time.time() - self.started_at > self.max_time # is_late + + def __exit__(self, tp, v, tb): + self.test.assertLess(time.time() - self.started_at, self.max_time) + return False + + +class AMQPConnectionCM(object): + """Context manager. Get new AMQP uplink. Consume ConnectionUp if consume_connectionup + + Use like: + + with self.new_amqp_connection() as amqp2: + amqp2.consume(...) + + """ + def __init__(self, testCase, consume_connectionup): + self.test = testCase + self.consume_connectionup = consume_connectionup + + def __enter__(self): + self.amqp = getamqp() + if self.consume_connectionup: + self.test.assertIsInstance(self.amqp.drain(3), ConnectionUp) + return self.amqp + + def __exit__(self, exc_type, exc_val, exc_tb): + self.amqp.shutdown() + return False + + +class FailbowlBackend(AMQPBackend): + def __init__(self, node, thread): + AMQPBackend.__init__(self, node, thread) + raise ConnectionFailedError('Failbowl') + + +class FailbowlSocket(object): + def __getattr__(self, item): + def failbowl(*args, **kwargs): + time.sleep(1) # hang and fail + raise socket.error - class CM(object): - def __enter__(self): - self.started_at = time.time() + def sleeper(*args, **kwargs): + time.sleep(1) # hang and fail - def __exit__(self, tp, v, tb): - test.assertLess(time.time() - self.started_at, max_time) - return False + if item in ('close', 'shutdown'): + return sleeper + else: + return failbowl - return CM() -- GitLab