diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index f0cd95dcf24893133d711511553f9010c26c93b2..71b3d5114462bd691ea0ab05e2282c0233b5268a 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -5,7 +5,7 @@ from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ QueueBind, QueueBindOk, ChannelClose, BasicCancel, BasicDeliver, \ - BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk + BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk, BasicQos from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE @@ -36,6 +36,8 @@ class Consumer(Channeler): :param queue: Queue object, being consumed from right now. Note that name of anonymous queue might change at any time! :param no_ack: Will this consumer require acknowledges from messages? + :param qos: a tuple of (prefetch size, prefetch window) for this consumer + :type qos: tuple(int, int) or tuple(None, int) :param dont_pause: Consumer will fail on the spot instead of pausing """ super(Consumer, self).__init__() @@ -50,6 +52,22 @@ class Consumer(Channeler): self.attache_group = None # attache group this belongs to. # if this is not None, then it has an attribute # on_cancel_customer(Consumer instance) + if qos is not None: + if qos[0] is None: + qos = 0, qos[1] # prefetch_size=0=undefined + self.qos = qos + self.qos_update_sent = False # QoS was not sent to server + + def set_qos(self, prefetch_size, prefetch_count): + """ + Set new QoS for this consumer. + + :param prefetch_size: prefetch in octets + :param prefetch_count: prefetch in whole messages + """ + if self.state == ST_ONLINE: + self.method(BasicQos(prefetch_size or 0, prefetch_count, False)) + self.qos = prefetch_size or 0, prefetch_count def cancel(self): """ @@ -109,9 +127,12 @@ class Consumer(Channeler): if payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED): should_retry = True + # We might not want to throw the connection away. + should_retry = should_retry and (not self.cancelled) + + super(Consumer, self).on_close(payload) - should_retry = should_retry and (not self.cancelled) #todo retry on access denied @@ -191,6 +212,8 @@ class Consumer(Channeler): self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): # itadakimasu + if self.qos is not None: + self.method(BasicQos(self.qos[0], self.qos[1], False)) self.method_and_watch( BasicConsume(self.queue.name.encode('utf8'), self.queue.name.encode('utf8'), False, self.no_ack, self.queue.exclusive, False, []), @@ -210,6 +233,9 @@ class Consumer(Channeler): self.state = ST_ONLINE self.on_operational(True) + # resend QoS, in case of sth + self.set_qos(self.qos[0], self.qos[1]) + class MessageReceiver(object): diff --git a/tests/run.py b/tests/run.py index ae3171b5a6cc18b08e8c4678f6186fec0a974a6c..dc6040c4ee68c585bcabfc5acfccdf73be61625f 100644 --- a/tests/run.py +++ b/tests/run.py @@ -22,7 +22,7 @@ if __name__ == '__main__': snr = SingleNodeReconnector(NODE, ag, lt) snr.connect() - ag.add(Consumer(Queue('siema-eniu'), no_ack=True)) + ag.add(Consumer(Queue('siema-eniu'), no_ack=False, qos=(None, 20))) class IPublishThread(threading.Thread): def __init__(self, ag): @@ -43,4 +43,6 @@ if __name__ == '__main__': while True: time.sleep(30) + + lt.terminate()