Skip to content
Snippets Groups Projects
Commit 2e3c4fb6 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

reconnects successfuly, access_refused to go

parent bbd7a88c
No related branches found
No related tags found
No related merge requests found
......@@ -60,7 +60,8 @@ class AttacheGroup(Attache):
:param connection: Connection instance of any state
"""
super(AttacheGroup, self).attach(connection)
# since this attache does not watch for failures, it can't use typical method.
self.connection = connection
for attache in self.attaches:
if not attache.cancelled:
......
......@@ -34,6 +34,8 @@ class Attache(object):
:param connection: Connection instance of any state
"""
assert self.connection is None
assert connection.state != ST_OFFLINE
self.connection = connection
......@@ -111,27 +113,30 @@ class Channeler(Attache):
If you need to do something else than just close a channel, please extend or modify as necessary.
WARNING: THIS WILL GET CALLED TWICE.
Once on ChannelClose - if so,
Second with None - because socket dies.
Be prepared!
"""
if self.connection is None:
# teardown already done
return
if self.state == ST_ONLINE:
# The channel has just lost operationality!
self.on_operational(False)
self.state = ST_OFFLINE
if payload is None:
# Connection went down HARD
if isinstance(payload, (ChannelClose, ChannelCloseOk)):
assert self.channel_id is not None
self.connection.free_channels.append(self.channel_id)
self.channel_id = None
elif isinstance(payload, ChannelClose):
# We have failed
print('Channel close: RC=%s RT=%s', payload.reply_code, payload.reply_text)
self.connection.free_channels.append(self.channel_id)
self.channel_id = None
# it's just dead don't bother with returning port
elif isinstance(payload, ChannelCloseOk):
self.connection.free_channels.append(self.channel_id)
self.channel_id = None
else:
raise Exception('Unrecognized payload - did you forget to handle something? :D')
self.connection = None
self.channel_id = None
print(self, 'pwned')
def methods(self, payloads):
"""
......@@ -180,6 +185,7 @@ class Channeler(Attache):
def on_uplink_established(self):
"""Called by connection. Connection reports being ready to do things."""
assert self.connection is not None
assert self.connection.state == ST_ONLINE, repr(self)
self.state = ST_SYNCING
self.channel_id = self.connection.free_channels.pop()
......
......@@ -84,6 +84,9 @@ class Consumer(Channeler):
1. self.state <- ST_OFFLINE, on_event(EV_OFFLINE) upon detecting that no more messages will
be there
2. self.channel_id <- None, channel is returned to Connection - channel has been physically torn down
Note, this can be called multiple times, and eventually with None.
"""
if self.state == ST_ONLINE:
# The channel has just lost operationality!
......@@ -95,22 +98,22 @@ class Consumer(Channeler):
if isinstance(payload, BasicCancel):
# Consumer Cancel Notification - by RabbitMQ
self.methods([BasicCancelOk(), ChannelClose(0, b'Received basic.cancel', 0, 0)])
return
elif isinstance(payload, BasicCancelOk):
if isinstance(payload, BasicCancelOk):
# OK, our cancelling went just fine - proceed with teardown
self.method(ChannelClose(0, b'Received basic.cancel-ok', 0, 0))
return
elif isinstance(payload, ChannelClose):
if isinstance(payload, ChannelClose):
if payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED):
should_retry = True
super(Consumer, self).on_close(payload)
else:
super(Consumer, self).on_close(payload)
super(Consumer, self).on_close(payload)
should_retry = should_retry and (not self.cancelled)
if should_retry:
self.attach(self.connection)
#todo retry on access denied
def on_delivery(self, sth):
"""
......
......@@ -59,6 +59,7 @@ class Publisher(Channeler, Synchronized):
"""
MODE_NOACK = 0 # no-ack publishing
MODE_CNPUB = 1 # RabbitMQ publisher confirms extension
#todo add fallback using plain AMQP transactions
def __init__(self, mode):
......@@ -66,9 +67,8 @@ class Publisher(Channeler, Synchronized):
Create a new publisher
:param mode: Publishing mode to use. One of:
MODE_NOACK - use non-ack mode
MODE_CNPUB - use consumer publishing mode. TypeError will be raised when this publisher
if attached to a consumer that doesn't have consumer publishes negotiated
:type mode: MODE_NOACK or MODE_CNPUB
MODE_CNPUB - use consumer publishing mode. A switch to MODE_TXPUB will be made
if broker does not support these.
:raise ValueError: mode invalid
"""
Channeler.__init__(self)
......@@ -85,20 +85,16 @@ class Publisher(Channeler, Synchronized):
self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB
@Synchronized.synchronized
def attach(self, connection):
super(Publisher, self).attach(connection)
Channeler.attach(self, connection)
connection.watch(FailWatch(self.on_fail))
@Synchronized.synchronized
def on_fail(self):
"""
Registered as a fail watch for connection
"""
self.state = ST_OFFLINE
self.connection = None
print('Publisher is FAILED')
@Synchronized.synchronized
def _pub(self, message, exchange_name, routing_key):
"""
Just send the message. Sends BasicDeliver + header + body.
......@@ -162,6 +158,7 @@ class Publisher(Channeler, Synchronized):
elif isinstance(payload, BasicNack):
self.tagger.nack(payload.delivery_tag, payload.multiple)
@Synchronized.synchronized
def publish(self, message, exchange_name=b'', routing_key=b''):
"""
Schedule to have a message published.
......
......@@ -15,7 +15,6 @@ from coolamqp.uplink import Connection
logger = logging.getLogger(__name__)
class SingleNodeReconnector(object):
"""
This has a Listener Thread, a Node Definition, and an attache group,
......@@ -38,6 +37,5 @@ class SingleNodeReconnector(object):
self.connection.add_finalizer(self.on_fail)
def on_fail(self):
print('I am failed, but will recover!')
self.connection = None
self.connect()
......@@ -73,6 +73,8 @@ class Connection(object):
If you call it while the connection IS up, callable will be called even before this returns.
You should be optimally an attached attache to receive this.
:param callable: callable/0 to call
"""
if self.state == ST_ONLINE:
......@@ -82,6 +84,7 @@ class Connection(object):
def on_connected(self):
"""Called by handshaker upon reception of final connection.open-ok"""
print(self.free_channels)
self.state = ST_ONLINE
while len(self.callables_on_connected) > 0:
......@@ -141,6 +144,7 @@ class Connection(object):
WARNING: Note that .on_fail can get called twice - once from .on_connection_close,
and second time from ListenerThread when socket is disposed of
Therefore we need to make sure callbacks are called EXACTLY once
"""
self.state = ST_OFFLINE # Update state
......@@ -157,8 +161,8 @@ class Connection(object):
self.any_watches = []
# call finalizers
for finalizer in self.finalizers:
finalizer()
while len(self.finalizers) > 0:
self.finalizers.pop()()
def on_connection_close(self, payload):
"""
......@@ -268,6 +272,7 @@ class Connection(object):
Register a watch.
:param watch: Watch to register
"""
assert self.state != ST_OFFLINE
if watch.channel is None:
self.any_watches.append(watch)
elif watch.channel not in self.watches:
......
......@@ -77,7 +77,6 @@ class Handshaker(object):
server_props = dict(payload.server_properties)
if b'capabilities' in server_props:
for label, fv in server_props[b'capabilities'][0]:
print('Detected extension: %s' % (label, ))
if label in SUPPORTED_EXTENSIONS:
if fv[0]:
self.connection.extensions.append(label)
......@@ -95,6 +94,7 @@ class Handshaker(object):
def on_connection_tune(self, payload):
self.connection.frame_max = payload.frame_max
self.connection.heartbeat = min(payload.heartbeat, self.heartbeat)
print('Selected', payload.channel_max, 'channels')
for channel in six.moves.xrange(1, (65535 if payload.channel_max == 0 else payload.channel_max)+1):
self.connection.free_channels.append(channel)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment