diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 269ae0a11a0dc2ca4425649c2cd9bdc6eb1e7121..b1123bd7d3f2526f8a1f0d267d170e05a31cf92a 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -7,11 +7,11 @@ from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ QueueBind, QueueBindOk, ChannelClose, BasicCancel, BasicDeliver, \ - BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, HARD_ERROR + BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, HARD_ERRORS from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch from coolamqp.objects import Future from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE -from coolamqp.exceptions import ResourceLocked, AMQPError +from coolamqp.exceptions import AMQPError logger = logging.getLogger(__name__) @@ -197,12 +197,14 @@ class Consumer(Channeler): if self.fail_on_first_time_resource_locked: # still, a RESOURCE_LOCKED on a first declaration ever suggests something is very wrong if self.future_to_notify: - self.future_to_notify.set_exception(ResourceLocked(payload)) + self.future_to_notify.set_exception(AMQPError(payload)) self.future_to_notify = None self.cancel() - - should_retry = True - elif rc in HARD_ERROR: + else: + # Do not notify the user, and retry at will. + # Do not zero the future - we will need to later confirm it, so it doesn't leak. + should_retry = True + elif rc in HARD_ERRORS: logger.warn('Channel closed due to hard error, %s: %s', payload.reply_code, payload.reply_text) if self.future_to_notify: self.future_to_notify.set_exception(AMQPError(payload)) diff --git a/coolamqp/exceptions.py b/coolamqp/exceptions.py index dec32c0dc7ba221925b43b3c31bced88f41b5c05..f10736b0e1921dca2b40135fb96fa0899cac8786 100644 --- a/coolamqp/exceptions.py +++ b/coolamqp/exceptions.py @@ -1,6 +1,10 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function +from coolamqp.framing.definitions import HARD_ERRORS, SOFT_ERRORS, CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, \ + SYNTAX_ERROR, COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, \ + INTERNAL_ERROR, CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, RESOURCE_LOCKED, PRECONDITION_FAILED + class CoolAMQPError(Exception): """Base class for CoolAMQP errors""" @@ -17,6 +21,11 @@ class AMQPError(CoolAMQPError): """ Base class for errors received from AMQP server """ + + def is_hard_error(self): + """Does this error close the connection?""" + return self.reply_code in HARD_ERRORS + def __init__(self, *args): """ @@ -33,7 +42,3 @@ class AMQPError(CoolAMQPError): else: assert len(args) == 4 self.reply_code, self.reply_text, self.class_id, self.method_id = args - - -class ResourceLocked(AMQPError): - """Shorthand to catch that stuff easier""" diff --git a/coolamqp/framing/compilation/compile_definitions.py b/coolamqp/framing/compilation/compile_definitions.py index ead887a6ea30a55c7fb8704684dc1bc12d8f4850..2b9b16699e30cff965926bd29fdedf0ab58cd370 100644 --- a/coolamqp/framing/compilation/compile_definitions.py +++ b/coolamqp/framing/compilation/compile_definitions.py @@ -100,7 +100,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved con_classes[constant.kind].append(pythonify_name(constant.name)) for constant_kind, constants in con_classes.items(): - line('\n%s = [%s]', pythonify_name(constant_kind), u', '.join(constants)) + line('\n%sS = [%s]', pythonify_name(constant_kind), u', '.join(constants)) # get domains domain_to_basic_type = {} diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index e387a50b53f813f3f2e4f10d25d7879806fb5e30..6532deae8fa46f068124ca1543a7fef0e89e6098 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -129,8 +129,8 @@ INTERNAL_ERROR = 541 # The server may require intervention by an operator in order to resume # normal operations. -HARD_ERROR = [CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, INTERNAL_ERROR] -SOFT_ERROR = [CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, RESOURCE_LOCKED, PRECONDITION_FAILED] +HARD_ERRORS = [CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, INTERNAL_ERROR] +SOFT_ERRORS = [CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, RESOURCE_LOCKED, PRECONDITION_FAILED] DOMAIN_TO_BASIC_TYPE = { diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index c3ea13fd0dd678c03a0db2aeb207e2fe2d9b4f48..8226393f7091121db1c168bb189ace8e5f2021c6 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -22,7 +22,7 @@ CLIENT_DATA = [ # because RabbitMQ is some kind of a fascist and does not allow # these fields to be of type short-string (b'product', (b'CoolAMQP', 'S')), - (b'version', (b'0.82', 'S')), + (b'version', (b'0.83', 'S')), (b'copyright', (b'Copyright (C) 2016-2017 DMS Serwis', 'S')), (b'information', (b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', 'S')), (b'capabilities', ([(capa, (True, 't')) for capa in SUPPORTED_EXTENSIONS], 'F')), diff --git a/setup.py b/setup.py index 928ce75da08710fb093b0349f88e30d5d2b310f6..e3fde995ab7c0e01924106d5db4da64c1087ccbe 100644 --- a/setup.py +++ b/setup.py @@ -4,12 +4,12 @@ from setuptools import setup setup(name=u'CoolAMQP', - version='0.82', + version='0.83', description=u'The fastest AMQP client', author=u'DMS Serwis s.c.', author_email=u'piotrm@smok.co', url=u'https://github.com/smok-serwis/coolamqp', - download_url='https://github.com/smok-serwis/coolamqp/archive/v0.82.zip', + download_url='https://github.com/smok-serwis/coolamqp/archive/v0.83.zip', keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], packages=[ 'coolamqp', @@ -41,7 +41,10 @@ Also, handles your reconnects and transactionality THE RIGHT WAY''', # 'Development Status :: 5 - Production/Stable', 'Development Status :: 4 - Beta', 'License :: OSI Approved :: MIT License', - 'Topic :: Software Development :: Libraries' + 'Topic :: Software Development :: Libraries', + 'Intended Audience :: Developers', + 'Topic :: Software Development :: Libraries :: Python Modules' # obvi + ] ) diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py index f31e27559a0135abac1ce6a3fae388b4c9195a1e..561adbf1286da46bcf543fabbd9601d079f5a76c 100644 --- a/tests/test_clustering/test_double.py +++ b/tests/test_clustering/test_double.py @@ -9,7 +9,7 @@ import time, logging, threading from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage from coolamqp.clustering import Cluster NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20) -from coolamqp.exceptions import ResourceLocked +from coolamqp.exceptions import AMQPError, RESOURCE_LOCKED class TestDouble(unittest.TestCase): @@ -31,6 +31,11 @@ class TestDouble(unittest.TestCase): con, fut = self.c1.consume(q, qos=(None, 20)) fut.result() - con2, fut2 = self.c2.consume(q, fail_on_first_time_resource_locked=True) - - self.assertRaises(ResourceLocked, lambda: fut2.result()) + try: + con2, fut2 = self.c2.consume(q, fail_on_first_time_resource_locked=True) + fut2.result() + except AMQPError as e: + self.assertEquals(e.reply_code, RESOURCE_LOCKED) + self.assertFalse(e.is_hard_error()) + else: + self.fail('Expected exception')