diff --git a/MANIFEST b/MANIFEST new file mode 100644 index 0000000000000000000000000000000000000000..fab2abd80eac371db1662a986a85bfdec634a2e2 --- /dev/null +++ b/MANIFEST @@ -0,0 +1,12 @@ +# file GENERATED by distutils, do NOT edit +setup.cfg +setup.py +coolamqp\__init__.py +coolamqp\cluster.py +coolamqp\events.py +coolamqp\handler.py +coolamqp\messages.py +coolamqp\orders.py +coolamqp\backends\__init__.py +coolamqp\backends\base.py +coolamqp\backends\pyamqp.py diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index 090b39ec5da6d4b71c571d067c09f13838d8b7fd..dfb3679252f5ff9dd52754dcaacccea9c30f0c9f 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -95,6 +95,13 @@ class AMQPBackend(object): :param delivery_tag: delivery tag to ack """ + def basic_qos(self, prefetch_size, prefetch_count): + """ + Issue a basic.qos(prefetch_size, prefetch_count, True) against broker + :param prefetch_size: prefetch window size in octets + :param prefetch_count: prefetch window in terms of whole messages + """ + def basic_nack(self, delivery_tag): """ NACK a message. diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 0e7636a3d807fe10fba78592d92a40f9ae98acd5..f4fea16aa61055e13894884fb82888506ce8c881 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -80,6 +80,10 @@ class PyAMQPBackend(AMQPBackend): def exchange_delete(self, exchange): self.channel.exchange_delete(exchange.name) + @translate_exceptions + def basic_qos(self, prefetch_size, prefetch_count): + self.channel.basic_qos(prefetch_size, prefetch_count, True) + @translate_exceptions def queue_delete(self, queue): self.channel.queue_delete(queue.name) diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 788980303bb495737bef126b3a8c78e09660fadf..995d963335301c51784939656982354b2737203e 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -2,7 +2,7 @@ import itertools import Queue from coolamqp.backends import PyAMQPBackend from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ - DeleteExchange + DeleteExchange, SetQoS from .messages import Exchange @@ -137,6 +137,11 @@ class Cluster(object): self.thread.order_queue.append(a) return a + def qos(self, prefetch_window, prefetch_count): + a = SetQoS(prefetch_window, prefetch_count) + self.thread.order_queue.append(a) + return a + def consume(self, queue, on_completed=None, on_failed=None): """ Start consuming from a queue diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 0fd75ac30646224f29b9a4593ee6f8167c571ddd..d757a08c4e81b1e8f9abdbb481411fee2db74fb4 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -8,7 +8,7 @@ from .messages import Exchange from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \ - DeleteExchange + DeleteExchange, SetQoS logger = logging.getLogger(__name__) @@ -35,6 +35,8 @@ class ClusterHandlerThread(threading.Thread): self.backend = None self.first_connect = True + self.qos = None # or tuple (prefetch_size, prefetch_count) if QoS set + def _reconnect(self): exponential_backoff_delay = 1 @@ -50,6 +52,9 @@ class ClusterHandlerThread(threading.Thread): try: self.backend = self.cluster.backend(node, self) + if self.qos is not None: + self.backend.basic_qos(*self.qos) + for exchange in self.declared_exchanges.itervalues(): self.backend.exchange_declare(exchange) @@ -92,6 +97,9 @@ class ClusterHandlerThread(threading.Thread): try: if isinstance(order, SendMessage): self.backend.basic_publish(order.message, order.exchange, order.routing_key) + elif isinstance(order, SetQoS): + self.qos = order.qos + self.backend.basic_qos(*self.qos) elif isinstance(order, DeclareExchange): self.backend.exchange_declare(order.exchange) self.declared_exchanges[order.exchange.name] = order.exchange diff --git a/coolamqp/orders.py b/coolamqp/orders.py index fc18d76a61be2b976011d1615f63db6ea2b586ea..b7a0e6e40a0426b555f0f62d53c915a6f7a9477c 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -75,6 +75,12 @@ class DeleteQueue(Order): self.queue = queue +class SetQoS(Order): + """Set QoS""" + def __init__(self, prefetch_window, prefetch_count): + self.qos = (prefetch_window, prefetch_count) + + class CancelQueue(Order): """Cancel consuming from a queue""" def __init__(self, queue, on_completed=None, on_failed=None): diff --git a/examples/send_to_myself.py b/examples/send_to_myself.py index c43e0ec06ba9484e14f2a93e699a1390d0e0d04a..66b5f6d9004dcbe8ee8cdc4f60ca009f4bc9a501 100644 --- a/examples/send_to_myself.py +++ b/examples/send_to_myself.py @@ -10,6 +10,8 @@ cluster = Cluster([ClusterNode('192.168.224.31:5672', 'smok', 'smok', 'smok', he a_queue = Queue(QUEUE_NAME, auto_delete=True) cluster.consume(a_queue) +cluster.qos(0, 1) + q = time.time() while True: diff --git a/setup.py b/setup.py index 99363727e4966ad42afc2a8a16f29e730d32615b..d5b969b8a58ad7d88382569332f730a7e8e7cd80 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from distutils.core import setup setup(name='CoolAMQP', - version='0.1', + version='0.2', description='The AMQP client library', author=u'DMS Serwis s.c.', author_email='piotrm@smok.co',