diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 50667c73f53129644715bffe476b79a0ff3e1f07..e988f3ef7e866b791f6743670dedc23d77cd0d9e 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -132,19 +132,26 @@ class Channeler(Attache): # teardown already done return + if self.state == ST_ONLINE: # The channel has just lost operationality! Inform others ASAP. + self.on_operational(False) + self.state = ST_OFFLINE + + if not isinstance(payload, (ChannelClose, ChannelCloseOk)) and (payload is not None): + # I do not know how to handle that! + return + if isinstance(payload, ChannelClose): # it would still be good to reply with channel.close-ok self.method(ChannelCloseOk()) - if self.state == ST_ONLINE: - # The channel has just lost operationality! - self.on_operational(False) - self.state = ST_OFFLINE - - if isinstance(payload, (ChannelClose, ChannelCloseOk)): + if payload is not None: assert self.channel_id is not None self.connection.free_channels.append(self.channel_id) # it's just dead don't bother with returning port + # at this point, this channel might still have some watches, + # especially if it was interrupted unexpectedly. + # clean up! + self.connection.unwatch_all(self.channel_id) self.connection = None self.channel_id = None diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index bf09c6cc1ef6d897044f3caa688fa4b54986a13c..fc905134028860fce984a997fd343a7190b30e9a 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -213,26 +213,22 @@ class Consumer(Channeler): if self.fail_on_first_time_resource_locked: # still, a RESOURCE_LOCKED on a first declaration ever suggests something is very wrong - if self.future_to_notify: - self.future_to_notify.set_exception(AMQPError(payload)) - self.future_to_notify = None - self.cancel() + self.cancelled = True else: # Do not notify the user, and retry at will. # Do not zero the future - we will need to later confirm it, so it doesn't leak. should_retry = True - elif rc in HARD_ERRORS: - logger.warn('Channel closed due to hard error, %s: %s', payload.reply_code, payload.reply_text) - if self.future_to_notify: - self.future_to_notify.set_exception(AMQPError(payload)) - self.future_to_notify = None + + if self.future_to_notify: + self.future_to_notify.set_exception(AMQPError(payload)) + self.future_to_notify = None # We might not want to throw the connection away. should_retry = should_retry and (not self.cancelled) old_con = self.connection - super(Consumer, self).on_close(payload) # this None's self.connection + super(Consumer, self).on_close(payload) # this None's self.connection and returns port self.fail_on_first_time_resource_locked = False if self.future_to_notify_on_dead: # notify it was cancelled @@ -252,8 +248,7 @@ class Consumer(Channeler): """ if self.receiver is None: - # spurious message during destruction of consumer? - logger.debug('Spurious deliver') + # dead, cancelled, whatever return if isinstance(sth, BasicDeliver): diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 72d10ca9fcc7129e8dc6a4eb32a4bf1b074dc6f5..2ec98e5f2e8a6136195cfd9d80f5a94856f87d1d 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -236,8 +236,10 @@ class Connection(object): # Watch remains alive if it was NOT triggered, or it's NOT a oneshot alive_watches.append(watch) - for watch in alive_watches: - self.watches[frame.channel].append(watch) + if frame.channel in self.watches: + # unwatch_all might have gotten called, check that + for watch in alive_watches: + self.watches[frame.channel].append(watch) # ==================== process "any" watches alive_watches = [] diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 8226393f7091121db1c168bb189ace8e5f2021c6..4fe0b66528eefc1809c97de058477c20b575e0b7 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -22,7 +22,7 @@ CLIENT_DATA = [ # because RabbitMQ is some kind of a fascist and does not allow # these fields to be of type short-string (b'product', (b'CoolAMQP', 'S')), - (b'version', (b'0.83', 'S')), + (b'version', (b'0.84', 'S')), (b'copyright', (b'Copyright (C) 2016-2017 DMS Serwis', 'S')), (b'information', (b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', 'S')), (b'capabilities', ([(capa, (True, 't')) for capa in SUPPORTED_EXTENSIONS], 'F')), diff --git a/setup.py b/setup.py index e3fde995ab7c0e01924106d5db4da64c1087ccbe..5d940b37300c91b06262014c0ec150303a45c9c6 100644 --- a/setup.py +++ b/setup.py @@ -4,12 +4,12 @@ from setuptools import setup setup(name=u'CoolAMQP', - version='0.83', + version='0.84', description=u'The fastest AMQP client', author=u'DMS Serwis s.c.', author_email=u'piotrm@smok.co', url=u'https://github.com/smok-serwis/coolamqp', - download_url='https://github.com/smok-serwis/coolamqp/archive/v0.83.zip', + download_url='https://github.com/smok-serwis/coolamqp/archive/v0.84.zip', keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], packages=[ 'coolamqp',