diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index e4239ae4061b75b8cd02a30fdf1672c4c37e3678..77c3b3965a340a94179b6b29b7ecac013970032b 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -34,12 +34,12 @@ from coolamqp.objects import Exchange logger = logging.getLogger(__name__) - # for holding messages when MODE_CNPUB and link is down CnpubMessageSendOrder = collections.namedtuple('CnpubMessageSendOrder', ('message', 'exchange_name', 'routing_key', 'future')) -#todo what if publisher in MODE_CNPUB fails mid message? they dont seem to be recovered + +# todo what if publisher in MODE_CNPUB fails mid message? they dont seem to be recovered class Publisher(Channeler, Synchronized): @@ -61,9 +61,10 @@ class Publisher(Channeler, Synchronized): _pub and on_fail are synchronized so that _pub doesn't see a partially destroyed class. """ - MODE_NOACK = 0 # no-ack publishing - MODE_CNPUB = 1 # RabbitMQ publisher confirms extension - #todo add fallback using plain AMQP transactions - this will remove UnusablePublisher and stuff + MODE_NOACK = 0 # no-ack publishing + MODE_CNPUB = 1 # RabbitMQ publisher confirms extension + + # todo add fallback using plain AMQP transactions - this will remove UnusablePublisher and stuff class UnusablePublisher(Exception): @@ -86,9 +87,9 @@ class Publisher(Channeler, Synchronized): self.mode = mode - self.messages = collections.deque() # Messages to publish. From newest to last. - # tuple of (Message object, exchange name::str, routing_key::str, - # Future to confirm or None, flags as tuple|empty tuple + self.messages = collections.deque() # Messages to publish. From newest to last. + # tuple of (Message object, exchange name::str, routing_key::str, + # Future to confirm or None, flags as tuple|empty tuple self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB @@ -124,7 +125,6 @@ class Publisher(Channeler, Synchronized): bodies.append(body[:max_body_size]) body = body[max_body_size:] - self.connection.send([ AMQPMethodFrame(self.channel_id, BasicPublish(exchange_name, routing_key, False, False)), AMQPHeaderFrame(self.channel_id, Basic.INDEX, 0, len(message.body), message.properties) @@ -148,7 +148,7 @@ class Publisher(Channeler, Synchronized): msg, xchg, rk, fut = self.messages.popleft() if not fut.set_running_or_notify_cancel(): - continue # cancelled + continue # cancelled self.tagger.deposit(self.tagger.get_key(), FutureConfirmableRejectable(fut)) assert isinstance(xchg, (six.binary_type, six.text_type)) @@ -206,7 +206,7 @@ class Publisher(Channeler, Synchronized): elif self.mode == Publisher.MODE_CNPUB: fut = Future() - #todo can optimize this not to create an object if ST_ONLINE already + # todo can optimize this not to create an object if ST_ONLINE already cnpo = CnpubMessageSendOrder(message, exchange, routing_key, fut) self.messages.append(cnpo) diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py index bfb90acd5cee04286c19ce03fa23e7adbdb8322a..a7e1bee88ee9eb7165ae84ad70d20e520565aa0c 100644 --- a/coolamqp/attaches/utils.py +++ b/coolamqp/attaches/utils.py @@ -26,11 +26,13 @@ class ConfirmableRejectable(object): :return: don't care """ + class FutureConfirmableRejectable(ConfirmableRejectable): """ A ConfirmableRejectable that can result a future (with None), or Exception it with a message """ + def __init__(self, future): self.future = future @@ -80,10 +82,10 @@ class AtomicTagger(object): self.lock = threading.RLock() # Protected by lock - self.next_tag = 1 # 0 is AMQP-reserved to mean "everything so far" + self.next_tag = 1 # 0 is AMQP-reserved to mean "everything so far" self.tags = [] # a list of (tag, ConfirmableRejectable) - # they remain to be acked/nacked - # invariant: FOR EACH i, j: (i>j) => (tags[i][0] > tags[j][0]) + # they remain to be acked/nacked + # invariant: FOR EACH i, j: (i>j) => (tags[i][0] > tags[j][0]) def deposit(self, tag, obj): """ @@ -107,10 +109,10 @@ class AtomicTagger(object): else: # Insert a value at place where it makes sense. Iterate from the end, because # values will usually land there... - i = len(self.tags) - 1 # start index + i = len(self.tags) - 1 # start index - while i>0: # this will terminate at i=0 - if self.tags[i][0] > tag: # this means we should insert it here... + while i > 0: # this will terminate at i=0 + if self.tags[i][0] > tag: # this means we should insert it here... break i -= 1 # previousl index @@ -132,10 +134,10 @@ class AtomicTagger(object): # Compute the ranges for stop, opt in enumerate(self.tags): if opt[0] == tag: - stop += 1 # this is exactly this tag. Adjust stop to end one further (Python slicing) and stop + stop += 1 # this is exactly this tag. Adjust stop to end one further (Python slicing) and stop break if opt[0] > tag: - break # We went too far, but it's OK, we don't need to bother with adjusting stop + break # We went too far, but it's OK, we don't need to bother with adjusting stop else: # List finished without breaking? That would mean the entire range! stop = len(self.tags) @@ -148,9 +150,8 @@ class AtomicTagger(object): else: return # not found! - if not multiple: - start = stop-1 + start = stop - 1 else: # Oh, I know the range! stop = len(self.tags) @@ -230,5 +231,3 @@ class Synchronized(object): return fun(*args, **kwargs) return monitored - - diff --git a/coolamqp/objects.py b/coolamqp/objects.py index c7fd455f2c4f0dfcb671632a914104514b80cd41..f72545d841008996b084407fe39c183e897978cf 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -181,7 +181,8 @@ class Queue(object): self.anonymous = len(self.name) == 0 # if this queue is anonymous, it must be regenerated upon reconnect - self.consumer_tag = self.name if not self.anonymous else uuid.uuid4().hex.encode('utf8') # bytes, consumer tag to use in AMQP comms + self.consumer_tag = self.name if not self.anonymous else uuid.uuid4().hex.encode( + 'utf8') # bytes, consumer tag to use in AMQP comms assert isinstance(self.name, six.binary_type) assert isinstance(self.consumer_tag, six.binary_type) diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index c36a056fa49143cea42f78ca663b317023f38bce..ddc24accad61d96d03ce04e90de63c272cea8ba3 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -52,20 +52,19 @@ class Connection(object): self.recvf = ReceivingFramer(self.on_frame) - #todo a list doesn't seem like a very strong atomicity guarantee - self.watches = {} # channel => list of [Watch instance] - self.any_watches = [] # list of Watches that should check everything + # todo a list doesn't seem like a very strong atomicity guarantee + self.watches = {} # channel => list of [Watch instance] + self.any_watches = [] # list of Watches that should check everything self.finalize = Callable(oneshots=True) #: public - self.state = ST_CONNECTING - self.callables_on_connected = [] # list of callable/0 + self.callables_on_connected = [] # list of callable/0 # Negotiated connection parameters - handshake will fill this in - self.free_channels = [] # attaches can use this for shit. - # WARNING: thread safety of this hinges on atomicity of .pop or .append + self.free_channels = [] # attaches can use this for shit. + # WARNING: thread safety of this hinges on atomicity of .pop or .append self.frame_max = None self.heartbeat = None self.extensions = [] @@ -108,7 +107,7 @@ class Connection(object): try: sock.connect((self.node_definition.host, self.node_definition.port)) except socket.error as e: - time.sleep(0.5) # Connection refused? Very bad things? + time.sleep(0.5) # Connection refused? Very bad things? else: break @@ -118,8 +117,8 @@ class Connection(object): sock.send(b'AMQP\x00\x00\x09\x01') self.listener_socket = self.listener_thread.register(sock, - on_read=self.recvf.put, - on_fail=self.on_fail) + on_read=self.recvf.put, + on_fail=self.on_fail) self.sendf = SendingFramer(self.listener_socket.send) self.watch_for_method(0, (ConnectionClose, ConnectionCloseOk), self.on_connection_close) @@ -141,11 +140,11 @@ class Connection(object): """ logger.info('Connection lost') - self.state = ST_OFFLINE # Update state + self.state = ST_OFFLINE # Update state watchlists = [self.watches[channel] for channel in self.watches] - for watchlist in watchlists: # Run all watches - failed + for watchlist in watchlists: # Run all watches - failed for watch in watchlist: if not watch.cancelled: watch.failed() @@ -154,7 +153,7 @@ class Connection(object): if not watch.cancelled: watch.failed() - self.watches = {} # Clear the watch list + self.watches = {} # Clear the watch list self.any_watches = [] # call finalizers @@ -166,11 +165,12 @@ class Connection(object): Called by ListenerThread. """ - self.on_fail() # it does not make sense to prolong the agony + self.on_fail() # it does not make sense to prolong the agony if isinstance(payload, ConnectionClose): self.send([AMQPMethodFrame(0, ConnectionCloseOk())]) - logger.info(u'Broker closed our connection - code %s reason %s', payload.reply_code, payload.reply_text.tobytes().decode('utf8')) + logger.info(u'Broker closed our connection - code %s reason %s', payload.reply_code, + payload.reply_text.tobytes().decode('utf8')) elif isinstance(payload, ConnectionCloseOk): self.send(None) @@ -205,7 +205,7 @@ class Connection(object): :param frame: AMQPFrame that was received """ - watch_handled = False # True if ANY watch handled this + watch_handled = False # True if ANY watch handled this if isinstance(frame, AMQPMethodFrame): logger.debug('Received %s', frame.payload.NAME) @@ -215,7 +215,7 @@ class Connection(object): # Note that new watches may arrive while we process existing watches. # Therefore, we need to copy watches and zero the list before we proceed if frame.channel in self.watches: - watches = self.watches[frame.channel] # a list + watches = self.watches[frame.channel] # a list self.watches[frame.channel] = [] alive_watches = [] @@ -260,7 +260,6 @@ class Connection(object): # print('any watch', watch, 'was cancelled') continue - if ((not watch_triggered) or (not watch.oneshot)) and (not watch.cancelled): # Watch remains alive if it was NOT triggered, or it's NOT a oneshot alive_watches.append(watch) @@ -280,10 +279,10 @@ class Connection(object): ListenerThread's on_fail callback. """ try: - #todo why is that necessary? it doesnt pass travis CI if there's no this block + # todo why is that necessary? it doesnt pass travis CI if there's no this block self.listener_socket.oneshot(delay, callback) except AttributeError: - pass #print(dir(self)) + pass # print(dir(self)) def unwatch_all(self, channel_id): """