Skip to content
Snippets Groups Projects
Commit ccf40787 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

yay qos

parent 2e3c4fb6
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,7 @@ from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame
from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \
BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \
QueueBind, QueueBindOk, ChannelClose, BasicCancel, BasicDeliver, \
BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk
BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk, BasicQos
from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch
from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE
......@@ -36,6 +36,8 @@ class Consumer(Channeler):
:param queue: Queue object, being consumed from right now.
Note that name of anonymous queue might change at any time!
:param no_ack: Will this consumer require acknowledges from messages?
:param qos: a tuple of (prefetch size, prefetch window) for this consumer
:type qos: tuple(int, int) or tuple(None, int)
:param dont_pause: Consumer will fail on the spot instead of pausing
"""
super(Consumer, self).__init__()
......@@ -50,6 +52,22 @@ class Consumer(Channeler):
self.attache_group = None # attache group this belongs to.
# if this is not None, then it has an attribute
# on_cancel_customer(Consumer instance)
if qos is not None:
if qos[0] is None:
qos = 0, qos[1] # prefetch_size=0=undefined
self.qos = qos
self.qos_update_sent = False # QoS was not sent to server
def set_qos(self, prefetch_size, prefetch_count):
"""
Set new QoS for this consumer.
:param prefetch_size: prefetch in octets
:param prefetch_count: prefetch in whole messages
"""
if self.state == ST_ONLINE:
self.method(BasicQos(prefetch_size or 0, prefetch_count, False))
self.qos = prefetch_size or 0, prefetch_count
def cancel(self):
"""
......@@ -109,9 +127,12 @@ class Consumer(Channeler):
if payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED):
should_retry = True
# We might not want to throw the connection away.
should_retry = should_retry and (not self.cancelled)
super(Consumer, self).on_close(payload)
should_retry = should_retry and (not self.cancelled)
#todo retry on access denied
......@@ -191,6 +212,8 @@ class Consumer(Channeler):
self.on_setup(QueueBindOk())
elif isinstance(payload, QueueBindOk):
# itadakimasu
if self.qos is not None:
self.method(BasicQos(self.qos[0], self.qos[1], False))
self.method_and_watch(
BasicConsume(self.queue.name.encode('utf8'), self.queue.name.encode('utf8'),
False, self.no_ack, self.queue.exclusive, False, []),
......@@ -210,6 +233,9 @@ class Consumer(Channeler):
self.state = ST_ONLINE
self.on_operational(True)
# resend QoS, in case of sth
self.set_qos(self.qos[0], self.qos[1])
class MessageReceiver(object):
......
......@@ -22,7 +22,7 @@ if __name__ == '__main__':
snr = SingleNodeReconnector(NODE, ag, lt)
snr.connect()
ag.add(Consumer(Queue('siema-eniu'), no_ack=True))
ag.add(Consumer(Queue('siema-eniu'), no_ack=False, qos=(None, 20)))
class IPublishThread(threading.Thread):
def __init__(self, ag):
......@@ -43,4 +43,6 @@ if __name__ == '__main__':
while True:
time.sleep(30)
lt.terminate()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment