Skip to content
Snippets Groups Projects
Commit e252b9d9 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

hazard in QoS fixed

It turns out that time between consume and basic.qos is enough for the server to hammer you into oblivion if sufficient messages are enqueued.

"You declare no Qos? Ok, have these 50 thousand messages at once."
parent 173d781c
No related branches found
No related tags found
No related merge requests found
......@@ -9,7 +9,7 @@ from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \
BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \
QueueBind, QueueBindOk, ChannelClose, BasicCancel, BasicDeliver, \
BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, HARD_ERRORS, \
BasicCancel
BasicCancel, BasicQosOk
from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch
from concurrent.futures import Future
from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE
......@@ -322,25 +322,27 @@ class Consumer(Channeler):
# default exchange, pretend it was bind ok
self.on_setup(QueueBindOk())
elif isinstance(payload, QueueBindOk):
# itadakimasu
if self.qos is not None:
self.method(BasicQos(self.qos[0], self.qos[1], False))
self.consumer_tag = uuid.uuid4().hex.encode('utf8') # str in py2, unicode in py3
self.method_and_watch(
BasicConsume(self.queue.name, self.consumer_tag,
False, self.no_ack, self.queue.exclusive, False, []),
BasicConsumeOk,
self.on_setup
)
self.method_and_watch(
BasicQos(self.qos[0], self.qos[1], False),
BasicQosOk,
self.on_setup
)
else:
self.on_setup(BasicQosOk()) # pretend QoS went ok
elif isinstance(payload, BasicQosOk):
self.consumer_tag = uuid.uuid4().hex.encode('utf8') # str in py2, unicode in py3
self.method_and_watch(
BasicConsume(self.queue.name, self.consumer_tag,
False, self.no_ack, self.queue.exclusive, False, []),
BasicConsumeOk,
self.on_setup
)
elif isinstance(payload, BasicConsumeOk):
# AWWW RIGHT~!!! We're good.
self.on_operational(True)
consumer_tag = self.consumer_tag
# Register watches for receiving shit
# this is multi-shot by default
self.hb_watch = HeaderOrBodyWatch(self.channel_id, self.on_delivery)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment