diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 9485d920cb53537291c7a09568578bdca596c6ed..50667c73f53129644715bffe476b79a0ff3e1f07 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -103,7 +103,7 @@ class Channeler(Attache): Channeler registers this for: (None - socket dead) - (BasicCancel, ChannelCloseOk, ChannelClose) + (BasicCancel, BasicCancelOk, ChannelCloseOk, ChannelClose) This method provides to send a response for ChannelClose @@ -198,16 +198,25 @@ class Channeler(Attache): raise Exception('Abstract method - override me!') + def register_on_close_watch(self): + """ + Register a watch for on_close. + + Since on_close is a one-shot, it will expire upon calling. + + To be called by on_close, when it needs to be notified just one more time. + """ + self.connection.watch_for_method(self.channel_id, (ChannelClose, ChannelCloseOk, BasicCancel, BasicCancelOk), + self.on_close, + on_fail=self.on_close) + def on_uplink_established(self): """Called by connection. Connection reports being ready to do things.""" assert self.connection is not None assert self.connection.state == ST_ONLINE, repr(self) self.state = ST_SYNCING self.channel_id = self.connection.free_channels.pop() - - self.connection.watch_for_method(self.channel_id, (ChannelClose, ChannelCloseOk, BasicCancel), - self.on_close, - on_fail=self.on_close) + self.register_on_close_watch() self.connection.method_and_watch( self.channel_id, diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index b1123bd7d3f2526f8a1f0d267d170e05a31cf92a..2d69a4a5fb7ef2c28e0783828dbe0b77b0a3ceda 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -7,7 +7,8 @@ from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ QueueBind, QueueBindOk, ChannelClose, BasicCancel, BasicDeliver, \ - BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, HARD_ERRORS + BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, HARD_ERRORS, \ + BasicCancel from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch from coolamqp.objects import Future from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE @@ -99,6 +100,8 @@ class Consumer(Channeler): self.cancel_on_failure = cancel_on_failure self.fucking_memoryviews = fucking_memoryviews + self.consumer_tag = None + def set_qos(self, prefetch_size, prefetch_count): """ Set new QoS for this consumer. @@ -124,15 +127,20 @@ class Consumer(Channeler): if self.future_to_notify_on_dead is not None: # we cancelled it earlier - warnings.warn(u'Why would you cancel a consumer twice?', RuntimeWarning) return self.future_to_notify_on_dead + else: + self.future_to_notify_on_dead = Future() self.cancelled = True - self.method(ChannelClose(0, b'consumer cancelled', 0, 0)) + # you'll blow up big next time you try to use this consumer if you can't cancel, but just close + if self.consumer_tag is not None: + self.method(BasicCancel(self.consumer_tag, False)) + else: + self.method(ChannelClose(0, b'cancelling', 0, 0)) + if self.attache_group is not None: self.attache_group.on_cancel_customer(self) - self.future_to_notify_on_dead = Future() return self.future_to_notify_on_dead @@ -165,9 +173,9 @@ class Consumer(Channeler): """ - if self.cancel_on_failure: - logger.debug('Consumer is cancel_on_failure and failure seen, cancelling') - self.cancel() + if self.cancel_on_failure and (not self.cancelled): + logger.debug('Consumer is cancel_on_failure and failure seen, True->cancelled') + self.cancelled = True if self.state == ST_ONLINE: # The channel has just lost operationality! @@ -179,11 +187,15 @@ class Consumer(Channeler): if isinstance(payload, BasicCancel): # Consumer Cancel Notification - by RabbitMQ # send them back those memoryviews :D + + # on_close is a one_shot watch. We need to re-register it now. + self.register_on_close_watch() self.methods([BasicCancelOk(payload.consumer_tag), ChannelClose(0, b'Received basic.cancel', 0, 0)]) return if isinstance(payload, BasicCancelOk): # OK, our cancelling went just fine - proceed with teardown + self.register_on_close_watch() self.method(ChannelClose(0, b'Received basic.cancel-ok', 0, 0)) return @@ -219,6 +231,7 @@ class Consumer(Channeler): self.fail_on_first_time_resource_locked = False if self.future_to_notify_on_dead: # notify it was cancelled + logger.info('Consumer successfully cancelled') self.future_to_notify_on_dead.set_result() @@ -322,6 +335,7 @@ class Consumer(Channeler): self.connection.watch(mw) self.state = ST_ONLINE + self.consumer_tag = payload.consumer_tag.tobytes() self.on_operational(True) # resend QoS, in case of sth diff --git a/coolamqp/objects.py b/coolamqp/objects.py index a734a8c8244f18c566ec22abfcd08522e8682d76..4979f8f2df37f9c4e7f00197ba6d2d50e73b30f8 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -209,6 +209,16 @@ class Future(concurrent.futures.Future): self.callables = [] + def __repr__(self): + a = [] + if self.completed: + a.append(u'completed') + if self.successfully: + a.append(u'successfully') + if self.cancelled: + a.append(u'cancelled') + return u'<CoolAMQP Future %s>' % (u' '.join(a), ) + def add_done_callback(self, fn): self.callables.append(fn) diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 84e5be3c75c43b43b98faddc176ee6f52c262409..795e7907b0f9aa6f719891914f5850731221f7be 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -52,6 +52,7 @@ class Connection(object): self.recvf = ReceivingFramer(self.on_frame) + #todo a list doesn't seem like a very strong atomicity guarantee self.watches = {} # channel => list of [Watch instance] self.any_watches = [] # list of Watches that should check everything @@ -204,8 +205,12 @@ class Connection(object): logger.debug('Received %s', frame.payload.NAME) # ==================== process per-channel watches + # + # Note that new watches may arrive while we process existing watches. + # Therefore, we need to copy watches and zero the list before we proceed if frame.channel in self.watches: watches = self.watches[frame.channel] # a list + self.watches[frame.channel] = [] alive_watches = [] while len(watches) > 0: @@ -222,12 +227,14 @@ class Connection(object): alive_watches.append(watch) for watch in alive_watches: - watches.append(watch) + self.watches[frame.channel].append(watch) # ==================== process "any" watches alive_watches = [] - while len(self.any_watches): - watch = self.any_watches.pop() + any_watches = self.any_watches + self.any_watches = [] + while len(any_watches): + watch = any_watches.pop() watch_triggered = watch.is_triggered_by(frame) watch_handled |= watch_triggered diff --git a/tests/run.py b/tests/run.py index 4c1a8591e9b2f3021d8189ff2dd3d7e1fecc30e7..5e7d185dc7b8f58a19c65e36618c0609555824ca 100644 --- a/tests/run.py +++ b/tests/run.py @@ -11,20 +11,13 @@ import time NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) -if __name__ == '__main__': - amqp = Cluster([NODE]) - amqp.start(wait=True) +amqp = Cluster([NODE]) +amqp.start(wait=True) - a = Exchange(u'jolax', type='fanout', auto_delete=True) - bad = Exchange(u'jolax', type='direct', auto_delete=True) - amqp.declare(a).result() +q = Queue(u'lolwut', auto_delete=True, exclusive=True) +c,f=amqp.consume(q, no_ack=True) - try: - amqp.declare(bad).result() - except AMQPError: - print(':)') +#time.sleep(30) - time.sleep(30) - - amqp.shutdown(True) +#amqp.shutdown(True)