diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index beccd72c3e991b0def16373bf79344fb929011a0..20440eeb432f107c6e31188730ecd937b719409d 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -9,7 +9,7 @@ from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ QueueBind, QueueBindOk, ChannelClose, BasicCancel, BasicDeliver, \ BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, HARD_ERRORS, \ - BasicCancel + BasicCancel, BasicQosOk from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch from concurrent.futures import Future from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE @@ -322,25 +322,27 @@ class Consumer(Channeler): # default exchange, pretend it was bind ok self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): - # itadakimasu if self.qos is not None: - self.method(BasicQos(self.qos[0], self.qos[1], False)) - - self.consumer_tag = uuid.uuid4().hex.encode('utf8') # str in py2, unicode in py3 - - self.method_and_watch( - BasicConsume(self.queue.name, self.consumer_tag, - False, self.no_ack, self.queue.exclusive, False, []), - BasicConsumeOk, - self.on_setup - ) - + self.method_and_watch( + BasicQos(self.qos[0], self.qos[1], False), + BasicQosOk, + self.on_setup + ) + else: + self.on_setup(BasicQosOk()) # pretend QoS went ok + elif isinstance(payload, BasicQosOk): + self.consumer_tag = uuid.uuid4().hex.encode('utf8') # str in py2, unicode in py3 + self.method_and_watch( + BasicConsume(self.queue.name, self.consumer_tag, + False, self.no_ack, self.queue.exclusive, False, []), + BasicConsumeOk, + self.on_setup + ) elif isinstance(payload, BasicConsumeOk): # AWWW RIGHT~!!! We're good. self.on_operational(True) consumer_tag = self.consumer_tag - # Register watches for receiving shit # this is multi-shot by default self.hb_watch = HeaderOrBodyWatch(self.channel_id, self.on_delivery)