From e252b9d943e7f0cc0be4a7db3240583964980f4e Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Mon, 23 Jan 2017 14:33:40 +0100 Subject: [PATCH] hazard in QoS fixed It turns out that time between consume and basic.qos is enough for the server to hammer you into oblivion if sufficient messages are enqueued. "You declare no Qos? Ok, have these 50 thousand messages at once." --- coolamqp/attaches/consumer.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index beccd72..20440ee 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -9,7 +9,7 @@ from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ QueueBind, QueueBindOk, ChannelClose, BasicCancel, BasicDeliver, \ BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, HARD_ERRORS, \ - BasicCancel + BasicCancel, BasicQosOk from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch from concurrent.futures import Future from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE @@ -322,25 +322,27 @@ class Consumer(Channeler): # default exchange, pretend it was bind ok self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): - # itadakimasu if self.qos is not None: - self.method(BasicQos(self.qos[0], self.qos[1], False)) - - self.consumer_tag = uuid.uuid4().hex.encode('utf8') # str in py2, unicode in py3 - - self.method_and_watch( - BasicConsume(self.queue.name, self.consumer_tag, - False, self.no_ack, self.queue.exclusive, False, []), - BasicConsumeOk, - self.on_setup - ) - + self.method_and_watch( + BasicQos(self.qos[0], self.qos[1], False), + BasicQosOk, + self.on_setup + ) + else: + self.on_setup(BasicQosOk()) # pretend QoS went ok + elif isinstance(payload, BasicQosOk): + self.consumer_tag = uuid.uuid4().hex.encode('utf8') # str in py2, unicode in py3 + self.method_and_watch( + BasicConsume(self.queue.name, self.consumer_tag, + False, self.no_ack, self.queue.exclusive, False, []), + BasicConsumeOk, + self.on_setup + ) elif isinstance(payload, BasicConsumeOk): # AWWW RIGHT~!!! We're good. self.on_operational(True) consumer_tag = self.consumer_tag - # Register watches for receiving shit # this is multi-shot by default self.hb_watch = HeaderOrBodyWatch(self.channel_id, self.on_delivery) -- GitLab