diff --git a/coolamqp/attaches/agroup.py b/coolamqp/attaches/agroup.py index 0eaee7d6b176282154bba977660a0d3ffeb6785f..2e3f3440b7939ef14788b8fdbfcaad24fe889f2f 100644 --- a/coolamqp/attaches/agroup.py +++ b/coolamqp/attaches/agroup.py @@ -60,7 +60,8 @@ class AttacheGroup(Attache): :param connection: Connection instance of any state """ - super(AttacheGroup, self).attach(connection) + # since this attache does not watch for failures, it can't use typical method. + self.connection = connection for attache in self.attaches: if not attache.cancelled: diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index f0997514b5077d45ff28e072ad90a795eac992a3..30849434deaba71175cf145b62a982a2674572b9 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -34,6 +34,8 @@ class Attache(object): :param connection: Connection instance of any state """ + assert self.connection is None + assert connection.state != ST_OFFLINE self.connection = connection @@ -111,27 +113,30 @@ class Channeler(Attache): If you need to do something else than just close a channel, please extend or modify as necessary. + WARNING: THIS WILL GET CALLED TWICE. + Once on ChannelClose - if so, + Second with None - because socket dies. + + Be prepared! + """ + if self.connection is None: + # teardown already done + return + if self.state == ST_ONLINE: # The channel has just lost operationality! self.on_operational(False) self.state = ST_OFFLINE - if payload is None: - # Connection went down HARD + if isinstance(payload, (ChannelClose, ChannelCloseOk)): + assert self.channel_id is not None self.connection.free_channels.append(self.channel_id) - self.channel_id = None - elif isinstance(payload, ChannelClose): - # We have failed - print('Channel close: RC=%s RT=%s', payload.reply_code, payload.reply_text) - self.connection.free_channels.append(self.channel_id) - self.channel_id = None + # it's just dead don't bother with returning port - elif isinstance(payload, ChannelCloseOk): - self.connection.free_channels.append(self.channel_id) - self.channel_id = None - else: - raise Exception('Unrecognized payload - did you forget to handle something? :D') + self.connection = None + self.channel_id = None + print(self, 'pwned') def methods(self, payloads): """ @@ -180,6 +185,7 @@ class Channeler(Attache): def on_uplink_established(self): """Called by connection. Connection reports being ready to do things.""" assert self.connection is not None + assert self.connection.state == ST_ONLINE, repr(self) self.state = ST_SYNCING self.channel_id = self.connection.free_channels.pop() diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 763bc1344104e28bb3145896ec663ed5943006b6..f0cd95dcf24893133d711511553f9010c26c93b2 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -84,6 +84,9 @@ class Consumer(Channeler): 1. self.state <- ST_OFFLINE, on_event(EV_OFFLINE) upon detecting that no more messages will be there 2. self.channel_id <- None, channel is returned to Connection - channel has been physically torn down + + Note, this can be called multiple times, and eventually with None. + """ if self.state == ST_ONLINE: # The channel has just lost operationality! @@ -95,22 +98,22 @@ class Consumer(Channeler): if isinstance(payload, BasicCancel): # Consumer Cancel Notification - by RabbitMQ self.methods([BasicCancelOk(), ChannelClose(0, b'Received basic.cancel', 0, 0)]) + return - elif isinstance(payload, BasicCancelOk): + if isinstance(payload, BasicCancelOk): # OK, our cancelling went just fine - proceed with teardown self.method(ChannelClose(0, b'Received basic.cancel-ok', 0, 0)) + return - elif isinstance(payload, ChannelClose): + if isinstance(payload, ChannelClose): if payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED): should_retry = True - super(Consumer, self).on_close(payload) - else: - super(Consumer, self).on_close(payload) + + super(Consumer, self).on_close(payload) should_retry = should_retry and (not self.cancelled) - if should_retry: - self.attach(self.connection) + #todo retry on access denied def on_delivery(self, sth): """ diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 217693d6309f9a4f0afb074b86dab779f1addfe1..28ec95ff0a0aa5785917758bdb32d6d44c45de28 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -59,6 +59,7 @@ class Publisher(Channeler, Synchronized): """ MODE_NOACK = 0 # no-ack publishing MODE_CNPUB = 1 # RabbitMQ publisher confirms extension + #todo add fallback using plain AMQP transactions def __init__(self, mode): @@ -66,9 +67,8 @@ class Publisher(Channeler, Synchronized): Create a new publisher :param mode: Publishing mode to use. One of: MODE_NOACK - use non-ack mode - MODE_CNPUB - use consumer publishing mode. TypeError will be raised when this publisher - if attached to a consumer that doesn't have consumer publishes negotiated - :type mode: MODE_NOACK or MODE_CNPUB + MODE_CNPUB - use consumer publishing mode. A switch to MODE_TXPUB will be made + if broker does not support these. :raise ValueError: mode invalid """ Channeler.__init__(self) @@ -85,20 +85,16 @@ class Publisher(Channeler, Synchronized): self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB + @Synchronized.synchronized def attach(self, connection): - super(Publisher, self).attach(connection) + Channeler.attach(self, connection) connection.watch(FailWatch(self.on_fail)) @Synchronized.synchronized def on_fail(self): - """ - Registered as a fail watch for connection - """ self.state = ST_OFFLINE - self.connection = None print('Publisher is FAILED') - @Synchronized.synchronized def _pub(self, message, exchange_name, routing_key): """ Just send the message. Sends BasicDeliver + header + body. @@ -162,6 +158,7 @@ class Publisher(Channeler, Synchronized): elif isinstance(payload, BasicNack): self.tagger.nack(payload.delivery_tag, payload.multiple) + @Synchronized.synchronized def publish(self, message, exchange_name=b'', routing_key=b''): """ Schedule to have a message published. diff --git a/coolamqp/persistence/__init__.py b/coolamqp/persistence/__init__.py index f4baa6193a4cd21cad73ff3680c50a3b27ddc64c..ae11e45a216c4367b3b02ad940b5917e3fb8260e 100644 --- a/coolamqp/persistence/__init__.py +++ b/coolamqp/persistence/__init__.py @@ -15,7 +15,6 @@ from coolamqp.uplink import Connection logger = logging.getLogger(__name__) - class SingleNodeReconnector(object): """ This has a Listener Thread, a Node Definition, and an attache group, @@ -38,6 +37,5 @@ class SingleNodeReconnector(object): self.connection.add_finalizer(self.on_fail) def on_fail(self): - print('I am failed, but will recover!') self.connection = None self.connect() diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 965e1ca5c60640e124a698bdf75442e939667029..426804257177c6cb953ab37af963d96a4af7848f 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -73,6 +73,8 @@ class Connection(object): If you call it while the connection IS up, callable will be called even before this returns. + You should be optimally an attached attache to receive this. + :param callable: callable/0 to call """ if self.state == ST_ONLINE: @@ -82,6 +84,7 @@ class Connection(object): def on_connected(self): """Called by handshaker upon reception of final connection.open-ok""" + print(self.free_channels) self.state = ST_ONLINE while len(self.callables_on_connected) > 0: @@ -141,6 +144,7 @@ class Connection(object): WARNING: Note that .on_fail can get called twice - once from .on_connection_close, and second time from ListenerThread when socket is disposed of + Therefore we need to make sure callbacks are called EXACTLY once """ self.state = ST_OFFLINE # Update state @@ -157,8 +161,8 @@ class Connection(object): self.any_watches = [] # call finalizers - for finalizer in self.finalizers: - finalizer() + while len(self.finalizers) > 0: + self.finalizers.pop()() def on_connection_close(self, payload): """ @@ -268,6 +272,7 @@ class Connection(object): Register a watch. :param watch: Watch to register """ + assert self.state != ST_OFFLINE if watch.channel is None: self.any_watches.append(watch) elif watch.channel not in self.watches: diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index c35423ff52e3e63edd886fc7560c503df2dc796a..4c8bf1e6fdfdad717af656fe0b6087387dbc4004 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -77,7 +77,6 @@ class Handshaker(object): server_props = dict(payload.server_properties) if b'capabilities' in server_props: for label, fv in server_props[b'capabilities'][0]: - print('Detected extension: %s' % (label, )) if label in SUPPORTED_EXTENSIONS: if fv[0]: self.connection.extensions.append(label) @@ -95,6 +94,7 @@ class Handshaker(object): def on_connection_tune(self, payload): self.connection.frame_max = payload.frame_max self.connection.heartbeat = min(payload.heartbeat, self.heartbeat) + print('Selected', payload.channel_max, 'channels') for channel in six.moves.xrange(1, (65535 if payload.channel_max == 0 else payload.channel_max)+1): self.connection.free_channels.append(channel)