Skip to content
Snippets Groups Projects
Commit 65b62ac7 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

code beautifier

parent 8a4fbf7d
No related branches found
No related tags found
No related merge requests found
...@@ -34,12 +34,12 @@ from coolamqp.objects import Exchange ...@@ -34,12 +34,12 @@ from coolamqp.objects import Exchange
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# for holding messages when MODE_CNPUB and link is down # for holding messages when MODE_CNPUB and link is down
CnpubMessageSendOrder = collections.namedtuple('CnpubMessageSendOrder', ('message', 'exchange_name', CnpubMessageSendOrder = collections.namedtuple('CnpubMessageSendOrder', ('message', 'exchange_name',
'routing_key', 'future')) 'routing_key', 'future'))
#todo what if publisher in MODE_CNPUB fails mid message? they dont seem to be recovered
# todo what if publisher in MODE_CNPUB fails mid message? they dont seem to be recovered
class Publisher(Channeler, Synchronized): class Publisher(Channeler, Synchronized):
...@@ -61,9 +61,10 @@ class Publisher(Channeler, Synchronized): ...@@ -61,9 +61,10 @@ class Publisher(Channeler, Synchronized):
_pub and on_fail are synchronized so that _pub doesn't see a partially destroyed class. _pub and on_fail are synchronized so that _pub doesn't see a partially destroyed class.
""" """
MODE_NOACK = 0 # no-ack publishing MODE_NOACK = 0 # no-ack publishing
MODE_CNPUB = 1 # RabbitMQ publisher confirms extension MODE_CNPUB = 1 # RabbitMQ publisher confirms extension
#todo add fallback using plain AMQP transactions - this will remove UnusablePublisher and stuff
# todo add fallback using plain AMQP transactions - this will remove UnusablePublisher and stuff
class UnusablePublisher(Exception): class UnusablePublisher(Exception):
...@@ -86,9 +87,9 @@ class Publisher(Channeler, Synchronized): ...@@ -86,9 +87,9 @@ class Publisher(Channeler, Synchronized):
self.mode = mode self.mode = mode
self.messages = collections.deque() # Messages to publish. From newest to last. self.messages = collections.deque() # Messages to publish. From newest to last.
# tuple of (Message object, exchange name::str, routing_key::str, # tuple of (Message object, exchange name::str, routing_key::str,
# Future to confirm or None, flags as tuple|empty tuple # Future to confirm or None, flags as tuple|empty tuple
self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB
...@@ -124,7 +125,6 @@ class Publisher(Channeler, Synchronized): ...@@ -124,7 +125,6 @@ class Publisher(Channeler, Synchronized):
bodies.append(body[:max_body_size]) bodies.append(body[:max_body_size])
body = body[max_body_size:] body = body[max_body_size:]
self.connection.send([ self.connection.send([
AMQPMethodFrame(self.channel_id, BasicPublish(exchange_name, routing_key, False, False)), AMQPMethodFrame(self.channel_id, BasicPublish(exchange_name, routing_key, False, False)),
AMQPHeaderFrame(self.channel_id, Basic.INDEX, 0, len(message.body), message.properties) AMQPHeaderFrame(self.channel_id, Basic.INDEX, 0, len(message.body), message.properties)
...@@ -148,7 +148,7 @@ class Publisher(Channeler, Synchronized): ...@@ -148,7 +148,7 @@ class Publisher(Channeler, Synchronized):
msg, xchg, rk, fut = self.messages.popleft() msg, xchg, rk, fut = self.messages.popleft()
if not fut.set_running_or_notify_cancel(): if not fut.set_running_or_notify_cancel():
continue # cancelled continue # cancelled
self.tagger.deposit(self.tagger.get_key(), FutureConfirmableRejectable(fut)) self.tagger.deposit(self.tagger.get_key(), FutureConfirmableRejectable(fut))
assert isinstance(xchg, (six.binary_type, six.text_type)) assert isinstance(xchg, (six.binary_type, six.text_type))
...@@ -206,7 +206,7 @@ class Publisher(Channeler, Synchronized): ...@@ -206,7 +206,7 @@ class Publisher(Channeler, Synchronized):
elif self.mode == Publisher.MODE_CNPUB: elif self.mode == Publisher.MODE_CNPUB:
fut = Future() fut = Future()
#todo can optimize this not to create an object if ST_ONLINE already # todo can optimize this not to create an object if ST_ONLINE already
cnpo = CnpubMessageSendOrder(message, exchange, routing_key, fut) cnpo = CnpubMessageSendOrder(message, exchange, routing_key, fut)
self.messages.append(cnpo) self.messages.append(cnpo)
......
...@@ -26,11 +26,13 @@ class ConfirmableRejectable(object): ...@@ -26,11 +26,13 @@ class ConfirmableRejectable(object):
:return: don't care :return: don't care
""" """
class FutureConfirmableRejectable(ConfirmableRejectable): class FutureConfirmableRejectable(ConfirmableRejectable):
""" """
A ConfirmableRejectable that can result a future (with None), A ConfirmableRejectable that can result a future (with None),
or Exception it with a message or Exception it with a message
""" """
def __init__(self, future): def __init__(self, future):
self.future = future self.future = future
...@@ -80,10 +82,10 @@ class AtomicTagger(object): ...@@ -80,10 +82,10 @@ class AtomicTagger(object):
self.lock = threading.RLock() self.lock = threading.RLock()
# Protected by lock # Protected by lock
self.next_tag = 1 # 0 is AMQP-reserved to mean "everything so far" self.next_tag = 1 # 0 is AMQP-reserved to mean "everything so far"
self.tags = [] # a list of (tag, ConfirmableRejectable) self.tags = [] # a list of (tag, ConfirmableRejectable)
# they remain to be acked/nacked # they remain to be acked/nacked
# invariant: FOR EACH i, j: (i>j) => (tags[i][0] > tags[j][0]) # invariant: FOR EACH i, j: (i>j) => (tags[i][0] > tags[j][0])
def deposit(self, tag, obj): def deposit(self, tag, obj):
""" """
...@@ -107,10 +109,10 @@ class AtomicTagger(object): ...@@ -107,10 +109,10 @@ class AtomicTagger(object):
else: else:
# Insert a value at place where it makes sense. Iterate from the end, because # Insert a value at place where it makes sense. Iterate from the end, because
# values will usually land there... # values will usually land there...
i = len(self.tags) - 1 # start index i = len(self.tags) - 1 # start index
while i>0: # this will terminate at i=0 while i > 0: # this will terminate at i=0
if self.tags[i][0] > tag: # this means we should insert it here... if self.tags[i][0] > tag: # this means we should insert it here...
break break
i -= 1 # previousl index i -= 1 # previousl index
...@@ -132,10 +134,10 @@ class AtomicTagger(object): ...@@ -132,10 +134,10 @@ class AtomicTagger(object):
# Compute the ranges # Compute the ranges
for stop, opt in enumerate(self.tags): for stop, opt in enumerate(self.tags):
if opt[0] == tag: if opt[0] == tag:
stop += 1 # this is exactly this tag. Adjust stop to end one further (Python slicing) and stop stop += 1 # this is exactly this tag. Adjust stop to end one further (Python slicing) and stop
break break
if opt[0] > tag: if opt[0] > tag:
break # We went too far, but it's OK, we don't need to bother with adjusting stop break # We went too far, but it's OK, we don't need to bother with adjusting stop
else: else:
# List finished without breaking? That would mean the entire range! # List finished without breaking? That would mean the entire range!
stop = len(self.tags) stop = len(self.tags)
...@@ -148,9 +150,8 @@ class AtomicTagger(object): ...@@ -148,9 +150,8 @@ class AtomicTagger(object):
else: else:
return # not found! return # not found!
if not multiple: if not multiple:
start = stop-1 start = stop - 1
else: else:
# Oh, I know the range! # Oh, I know the range!
stop = len(self.tags) stop = len(self.tags)
...@@ -230,5 +231,3 @@ class Synchronized(object): ...@@ -230,5 +231,3 @@ class Synchronized(object):
return fun(*args, **kwargs) return fun(*args, **kwargs)
return monitored return monitored
...@@ -181,7 +181,8 @@ class Queue(object): ...@@ -181,7 +181,8 @@ class Queue(object):
self.anonymous = len(self.name) == 0 # if this queue is anonymous, it must be regenerated upon reconnect self.anonymous = len(self.name) == 0 # if this queue is anonymous, it must be regenerated upon reconnect
self.consumer_tag = self.name if not self.anonymous else uuid.uuid4().hex.encode('utf8') # bytes, consumer tag to use in AMQP comms self.consumer_tag = self.name if not self.anonymous else uuid.uuid4().hex.encode(
'utf8') # bytes, consumer tag to use in AMQP comms
assert isinstance(self.name, six.binary_type) assert isinstance(self.name, six.binary_type)
assert isinstance(self.consumer_tag, six.binary_type) assert isinstance(self.consumer_tag, six.binary_type)
......
...@@ -52,20 +52,19 @@ class Connection(object): ...@@ -52,20 +52,19 @@ class Connection(object):
self.recvf = ReceivingFramer(self.on_frame) self.recvf = ReceivingFramer(self.on_frame)
#todo a list doesn't seem like a very strong atomicity guarantee # todo a list doesn't seem like a very strong atomicity guarantee
self.watches = {} # channel => list of [Watch instance] self.watches = {} # channel => list of [Watch instance]
self.any_watches = [] # list of Watches that should check everything self.any_watches = [] # list of Watches that should check everything
self.finalize = Callable(oneshots=True) #: public self.finalize = Callable(oneshots=True) #: public
self.state = ST_CONNECTING self.state = ST_CONNECTING
self.callables_on_connected = [] # list of callable/0 self.callables_on_connected = [] # list of callable/0
# Negotiated connection parameters - handshake will fill this in # Negotiated connection parameters - handshake will fill this in
self.free_channels = [] # attaches can use this for shit. self.free_channels = [] # attaches can use this for shit.
# WARNING: thread safety of this hinges on atomicity of .pop or .append # WARNING: thread safety of this hinges on atomicity of .pop or .append
self.frame_max = None self.frame_max = None
self.heartbeat = None self.heartbeat = None
self.extensions = [] self.extensions = []
...@@ -108,7 +107,7 @@ class Connection(object): ...@@ -108,7 +107,7 @@ class Connection(object):
try: try:
sock.connect((self.node_definition.host, self.node_definition.port)) sock.connect((self.node_definition.host, self.node_definition.port))
except socket.error as e: except socket.error as e:
time.sleep(0.5) # Connection refused? Very bad things? time.sleep(0.5) # Connection refused? Very bad things?
else: else:
break break
...@@ -118,8 +117,8 @@ class Connection(object): ...@@ -118,8 +117,8 @@ class Connection(object):
sock.send(b'AMQP\x00\x00\x09\x01') sock.send(b'AMQP\x00\x00\x09\x01')
self.listener_socket = self.listener_thread.register(sock, self.listener_socket = self.listener_thread.register(sock,
on_read=self.recvf.put, on_read=self.recvf.put,
on_fail=self.on_fail) on_fail=self.on_fail)
self.sendf = SendingFramer(self.listener_socket.send) self.sendf = SendingFramer(self.listener_socket.send)
self.watch_for_method(0, (ConnectionClose, ConnectionCloseOk), self.on_connection_close) self.watch_for_method(0, (ConnectionClose, ConnectionCloseOk), self.on_connection_close)
...@@ -141,11 +140,11 @@ class Connection(object): ...@@ -141,11 +140,11 @@ class Connection(object):
""" """
logger.info('Connection lost') logger.info('Connection lost')
self.state = ST_OFFLINE # Update state self.state = ST_OFFLINE # Update state
watchlists = [self.watches[channel] for channel in self.watches] watchlists = [self.watches[channel] for channel in self.watches]
for watchlist in watchlists: # Run all watches - failed for watchlist in watchlists: # Run all watches - failed
for watch in watchlist: for watch in watchlist:
if not watch.cancelled: if not watch.cancelled:
watch.failed() watch.failed()
...@@ -154,7 +153,7 @@ class Connection(object): ...@@ -154,7 +153,7 @@ class Connection(object):
if not watch.cancelled: if not watch.cancelled:
watch.failed() watch.failed()
self.watches = {} # Clear the watch list self.watches = {} # Clear the watch list
self.any_watches = [] self.any_watches = []
# call finalizers # call finalizers
...@@ -166,11 +165,12 @@ class Connection(object): ...@@ -166,11 +165,12 @@ class Connection(object):
Called by ListenerThread. Called by ListenerThread.
""" """
self.on_fail() # it does not make sense to prolong the agony self.on_fail() # it does not make sense to prolong the agony
if isinstance(payload, ConnectionClose): if isinstance(payload, ConnectionClose):
self.send([AMQPMethodFrame(0, ConnectionCloseOk())]) self.send([AMQPMethodFrame(0, ConnectionCloseOk())])
logger.info(u'Broker closed our connection - code %s reason %s', payload.reply_code, payload.reply_text.tobytes().decode('utf8')) logger.info(u'Broker closed our connection - code %s reason %s', payload.reply_code,
payload.reply_text.tobytes().decode('utf8'))
elif isinstance(payload, ConnectionCloseOk): elif isinstance(payload, ConnectionCloseOk):
self.send(None) self.send(None)
...@@ -205,7 +205,7 @@ class Connection(object): ...@@ -205,7 +205,7 @@ class Connection(object):
:param frame: AMQPFrame that was received :param frame: AMQPFrame that was received
""" """
watch_handled = False # True if ANY watch handled this watch_handled = False # True if ANY watch handled this
if isinstance(frame, AMQPMethodFrame): if isinstance(frame, AMQPMethodFrame):
logger.debug('Received %s', frame.payload.NAME) logger.debug('Received %s', frame.payload.NAME)
...@@ -215,7 +215,7 @@ class Connection(object): ...@@ -215,7 +215,7 @@ class Connection(object):
# Note that new watches may arrive while we process existing watches. # Note that new watches may arrive while we process existing watches.
# Therefore, we need to copy watches and zero the list before we proceed # Therefore, we need to copy watches and zero the list before we proceed
if frame.channel in self.watches: if frame.channel in self.watches:
watches = self.watches[frame.channel] # a list watches = self.watches[frame.channel] # a list
self.watches[frame.channel] = [] self.watches[frame.channel] = []
alive_watches = [] alive_watches = []
...@@ -260,7 +260,6 @@ class Connection(object): ...@@ -260,7 +260,6 @@ class Connection(object):
# print('any watch', watch, 'was cancelled') # print('any watch', watch, 'was cancelled')
continue continue
if ((not watch_triggered) or (not watch.oneshot)) and (not watch.cancelled): if ((not watch_triggered) or (not watch.oneshot)) and (not watch.cancelled):
# Watch remains alive if it was NOT triggered, or it's NOT a oneshot # Watch remains alive if it was NOT triggered, or it's NOT a oneshot
alive_watches.append(watch) alive_watches.append(watch)
...@@ -280,10 +279,10 @@ class Connection(object): ...@@ -280,10 +279,10 @@ class Connection(object):
ListenerThread's on_fail callback. ListenerThread's on_fail callback.
""" """
try: try:
#todo why is that necessary? it doesnt pass travis CI if there's no this block # todo why is that necessary? it doesnt pass travis CI if there's no this block
self.listener_socket.oneshot(delay, callback) self.listener_socket.oneshot(delay, callback)
except AttributeError: except AttributeError:
pass #print(dir(self)) pass # print(dir(self))
def unwatch_all(self, channel_id): def unwatch_all(self, channel_id):
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment