Skip to content
Snippets Groups Projects
Commit 9b4a54c2 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

added support for QoS

parent 5df95cd9
No related branches found
No related tags found
No related merge requests found
MANIFEST 0 → 100644
# file GENERATED by distutils, do NOT edit
setup.cfg
setup.py
coolamqp\__init__.py
coolamqp\cluster.py
coolamqp\events.py
coolamqp\handler.py
coolamqp\messages.py
coolamqp\orders.py
coolamqp\backends\__init__.py
coolamqp\backends\base.py
coolamqp\backends\pyamqp.py
...@@ -95,6 +95,13 @@ class AMQPBackend(object): ...@@ -95,6 +95,13 @@ class AMQPBackend(object):
:param delivery_tag: delivery tag to ack :param delivery_tag: delivery tag to ack
""" """
def basic_qos(self, prefetch_size, prefetch_count):
"""
Issue a basic.qos(prefetch_size, prefetch_count, True) against broker
:param prefetch_size: prefetch window size in octets
:param prefetch_count: prefetch window in terms of whole messages
"""
def basic_nack(self, delivery_tag): def basic_nack(self, delivery_tag):
""" """
NACK a message. NACK a message.
......
...@@ -80,6 +80,10 @@ class PyAMQPBackend(AMQPBackend): ...@@ -80,6 +80,10 @@ class PyAMQPBackend(AMQPBackend):
def exchange_delete(self, exchange): def exchange_delete(self, exchange):
self.channel.exchange_delete(exchange.name) self.channel.exchange_delete(exchange.name)
@translate_exceptions
def basic_qos(self, prefetch_size, prefetch_count):
self.channel.basic_qos(prefetch_size, prefetch_count, True)
@translate_exceptions @translate_exceptions
def queue_delete(self, queue): def queue_delete(self, queue):
self.channel.queue_delete(queue.name) self.channel.queue_delete(queue.name)
......
...@@ -2,7 +2,7 @@ import itertools ...@@ -2,7 +2,7 @@ import itertools
import Queue import Queue
from coolamqp.backends import PyAMQPBackend from coolamqp.backends import PyAMQPBackend
from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \
DeleteExchange DeleteExchange, SetQoS
from .messages import Exchange from .messages import Exchange
...@@ -137,6 +137,11 @@ class Cluster(object): ...@@ -137,6 +137,11 @@ class Cluster(object):
self.thread.order_queue.append(a) self.thread.order_queue.append(a)
return a return a
def qos(self, prefetch_window, prefetch_count):
a = SetQoS(prefetch_window, prefetch_count)
self.thread.order_queue.append(a)
return a
def consume(self, queue, on_completed=None, on_failed=None): def consume(self, queue, on_completed=None, on_failed=None):
""" """
Start consuming from a queue Start consuming from a queue
......
...@@ -8,7 +8,7 @@ from .messages import Exchange ...@@ -8,7 +8,7 @@ from .messages import Exchange
from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived
from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \
AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \ AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \
DeleteExchange DeleteExchange, SetQoS
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -35,6 +35,8 @@ class ClusterHandlerThread(threading.Thread): ...@@ -35,6 +35,8 @@ class ClusterHandlerThread(threading.Thread):
self.backend = None self.backend = None
self.first_connect = True self.first_connect = True
self.qos = None # or tuple (prefetch_size, prefetch_count) if QoS set
def _reconnect(self): def _reconnect(self):
exponential_backoff_delay = 1 exponential_backoff_delay = 1
...@@ -50,6 +52,9 @@ class ClusterHandlerThread(threading.Thread): ...@@ -50,6 +52,9 @@ class ClusterHandlerThread(threading.Thread):
try: try:
self.backend = self.cluster.backend(node, self) self.backend = self.cluster.backend(node, self)
if self.qos is not None:
self.backend.basic_qos(*self.qos)
for exchange in self.declared_exchanges.itervalues(): for exchange in self.declared_exchanges.itervalues():
self.backend.exchange_declare(exchange) self.backend.exchange_declare(exchange)
...@@ -92,6 +97,9 @@ class ClusterHandlerThread(threading.Thread): ...@@ -92,6 +97,9 @@ class ClusterHandlerThread(threading.Thread):
try: try:
if isinstance(order, SendMessage): if isinstance(order, SendMessage):
self.backend.basic_publish(order.message, order.exchange, order.routing_key) self.backend.basic_publish(order.message, order.exchange, order.routing_key)
elif isinstance(order, SetQoS):
self.qos = order.qos
self.backend.basic_qos(*self.qos)
elif isinstance(order, DeclareExchange): elif isinstance(order, DeclareExchange):
self.backend.exchange_declare(order.exchange) self.backend.exchange_declare(order.exchange)
self.declared_exchanges[order.exchange.name] = order.exchange self.declared_exchanges[order.exchange.name] = order.exchange
......
...@@ -75,6 +75,12 @@ class DeleteQueue(Order): ...@@ -75,6 +75,12 @@ class DeleteQueue(Order):
self.queue = queue self.queue = queue
class SetQoS(Order):
"""Set QoS"""
def __init__(self, prefetch_window, prefetch_count):
self.qos = (prefetch_window, prefetch_count)
class CancelQueue(Order): class CancelQueue(Order):
"""Cancel consuming from a queue""" """Cancel consuming from a queue"""
def __init__(self, queue, on_completed=None, on_failed=None): def __init__(self, queue, on_completed=None, on_failed=None):
......
...@@ -10,6 +10,8 @@ cluster = Cluster([ClusterNode('192.168.224.31:5672', 'smok', 'smok', 'smok', he ...@@ -10,6 +10,8 @@ cluster = Cluster([ClusterNode('192.168.224.31:5672', 'smok', 'smok', 'smok', he
a_queue = Queue(QUEUE_NAME, auto_delete=True) a_queue = Queue(QUEUE_NAME, auto_delete=True)
cluster.consume(a_queue) cluster.consume(a_queue)
cluster.qos(0, 1)
q = time.time() q = time.time()
while True: while True:
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
from distutils.core import setup from distutils.core import setup
setup(name='CoolAMQP', setup(name='CoolAMQP',
version='0.1', version='0.2',
description='The AMQP client library', description='The AMQP client library',
author=u'DMS Serwis s.c.', author=u'DMS Serwis s.c.',
author_email='piotrm@smok.co', author_email='piotrm@smok.co',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment