diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 2d69a4a5fb7ef2c28e0783828dbe0b77b0a3ceda..bf09c6cc1ef6d897044f3caa688fa4b54986a13c 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -2,6 +2,7 @@ from __future__ import absolute_import, division, print_function import six import logging +import uuid import warnings from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ @@ -157,6 +158,8 @@ class Consumer(Channeler): self.future_to_notify = None else: + self.hb_watch.cancel() + self.deliver_watch.cancel() self.receiver.on_gone() self.receiver = None @@ -180,7 +183,9 @@ class Consumer(Channeler): if self.state == ST_ONLINE: # The channel has just lost operationality! self.on_operational(False) - self.state = ST_OFFLINE + self.state = ST_OFFLINE + + # deliver and head/body watch will clean up after themselves should_retry = False @@ -245,6 +250,12 @@ class Consumer(Channeler): Callback for delivery-related shit :param sth: AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or AMQPBodyFrame """ + + if self.receiver is None: + # spurious message during destruction of consumer? + logger.debug('Spurious deliver') + return + if isinstance(sth, BasicDeliver): self.receiver.on_basic_deliver(sth) elif isinstance(sth, AMQPBodyFrame): @@ -318,8 +329,11 @@ class Consumer(Channeler): # itadakimasu if self.qos is not None: self.method(BasicQos(self.qos[0], self.qos[1], False)) + + self.consumer_tag = uuid.uuid4().hex.encode('utf8') # str in py2, unicode in py3 + self.method_and_watch( - BasicConsume(self.queue.name, self.queue.name, + BasicConsume(self.queue.name, self.consumer_tag, False, self.no_ack, self.queue.exclusive, False, []), BasicConsumeOk, self.on_setup @@ -327,16 +341,21 @@ class Consumer(Channeler): elif isinstance(payload, BasicConsumeOk): # AWWW RIGHT~!!! We're good. + self.on_operational(True) + consumer_tag = self.consumer_tag + # Register watches for receiving shit - self.connection.watch(HeaderOrBodyWatch(self.channel_id, self.on_delivery)) - mw = MethodWatch(self.channel_id, BasicDeliver, self.on_delivery) - mw.oneshot = False - self.connection.watch(mw) + # this is multi-shot by default + self.hb_watch = HeaderOrBodyWatch(self.channel_id, self.on_delivery) + self.connection.watch(self.hb_watch) + + # multi-shot watches need manual cleanup! + self.deliver_watch = MethodWatch(self.channel_id, BasicDeliver, self.on_delivery) + self.deliver_watch.oneshot = False + self.connection.watch(self.deliver_watch) self.state = ST_ONLINE - self.consumer_tag = payload.consumer_tag.tobytes() - self.on_operational(True) # resend QoS, in case of sth if self.qos is not None: diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 795e7907b0f9aa6f719891914f5850731221f7be..72d10ca9fcc7129e8dc6a4eb32a4bf1b074dc6f5 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -11,7 +11,7 @@ from coolamqp.uplink.connection.send_framer import SendingFramer from coolamqp.framing.frames import AMQPMethodFrame from coolamqp.uplink.handshake import Handshaker from coolamqp.framing.definitions import ConnectionClose, ConnectionCloseOk -from coolamqp.uplink.connection.watches import MethodWatch +from coolamqp.uplink.connection.watches import MethodWatch, Watch from coolamqp.uplink.connection.states import ST_ONLINE, ST_OFFLINE, ST_CONNECTING from coolamqp.objects import Callable @@ -146,10 +146,12 @@ class Connection(object): for watchlist in watchlists: # Run all watches - failed for watch in watchlist: - watch.failed() + if not watch.cancelled: + watch.failed() for watch in self.any_watches: - watch.failed() + if not watch.cancelled: + watch.failed() self.watches = {} # Clear the watch list self.any_watches = [] @@ -183,6 +185,9 @@ class Connection(object): :param reason: optional human-readable reason for this action """ if frames is not None: + # for frame in frames: + # if isinstance(frame, AMQPMethodFrame): + # print('Sending ', frame.payload) self.sendf.send(frames, priority=priority) else: # Listener socket will kill us when time is right @@ -217,12 +222,17 @@ class Connection(object): watch = watches.pop() if watch.cancelled: + # print('watch',watch,'was cancelled') continue watch_triggered = watch.is_triggered_by(frame) watch_handled |= watch_triggered - if (not watch_triggered) or (not watch.oneshot): + if watch.cancelled: + # print('watch',watch,'was cancelled') + continue + + if ((not watch_triggered) or (not watch.oneshot)) and (not watch.cancelled): # Watch remains alive if it was NOT triggered, or it's NOT a oneshot alive_watches.append(watch) @@ -235,10 +245,20 @@ class Connection(object): self.any_watches = [] while len(any_watches): watch = any_watches.pop() + + if watch.cancelled: + # print('any watch', watch, 'was cancelled') + continue + watch_triggered = watch.is_triggered_by(frame) watch_handled |= watch_triggered - if (not watch_triggered) or (not watch.oneshot): + if watch.cancelled: + # print('any watch', watch, 'was cancelled') + continue + + + if ((not watch_triggered) or (not watch.oneshot)) and (not watch.cancelled): # Watch remains alive if it was NOT triggered, or it's NOT a oneshot alive_watches.append(watch) diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index 10026c88df42b88f4a201db3fba7ecb15b924fbd..5f273d586580e635d877fb563d86b9f12e4d607d 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -9,6 +9,9 @@ class Watch(object): A watch is placed per-channel, to listen for a particular frame. """ + class CancelMe(Exception): + """To be raised in a watch if it wants to be cancelled""" + def __init__(self, channel, oneshot): """ :param channel: Channel to listen to. @@ -60,7 +63,10 @@ class AnyWatch(Watch): self.callable = callable def is_triggered_by(self, frame): - self.callable(frame) + try: + self.callable(frame) + except Watch.CancelMe: + self.cancel() return True @@ -91,7 +97,10 @@ class HeaderOrBodyWatch(Watch): def is_triggered_by(self, frame): if not (isinstance(frame, (AMQPHeaderFrame, AMQPBodyFrame))): return False - self.callable(frame) + try: + self.callable(frame) + except Watch.CancelMe: + self.cancel() return True @@ -123,6 +132,9 @@ class MethodWatch(Watch): return False if isinstance(frame.payload, self.methods): - self.callable(frame.payload) + try: + self.callable(frame.payload) + except Watch.CancelMe: + self.cancel() return True return False