From bd9cabd138348694254f9bf9400659039cf3476c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sat, 28 May 2016 01:28:45 +0200 Subject: [PATCH] consumers can be cancelled --- coolamqp/backends/__init__.py | 3 ++- coolamqp/backends/pyamqp.py | 6 ++---- coolamqp/cluster.py | 18 +++++++++++++++--- coolamqp/handler.py | 1 - 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py index 31c5b38..d588907 100644 --- a/coolamqp/backends/__init__.py +++ b/coolamqp/backends/__init__.py @@ -1 +1,2 @@ -from .pyamqp import PyAMQPBackend, AMQPError, RemoteAMQPError, ConnectionFailedError \ No newline at end of file +from .pyamqp import PyAMQPBackend +from .base import AMQPError, ConnectionFailedError, RemoteAMQPError \ No newline at end of file diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 1f9de7b..4a520fd 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -2,7 +2,7 @@ import amqp import socket import functools -from .base import AMQPBackend, AMQPError, RemoteAMQPError, ConnectionFailedError +from .base import AMQPBackend, RemoteAMQPError, ConnectionFailedError def translate_exceptions(fun): @@ -53,7 +53,7 @@ class PyAMQPBackend(AMQPBackend): @translate_exceptions def basic_cancel(self, consumer_tag): - self.amqp_channel.basic_cancel(consumer_tag) + self.channel.basic_cancel(consumer_tag) @translate_exceptions def basic_publish(self, message, exchange, routing_key): @@ -91,8 +91,6 @@ class PyAMQPBackend(AMQPBackend): if queue.anonymous: queue.name = '' - print 'declaring queue that is %s %s %s %s' % (queue.name, queue.durable, queue.exclusive, queue.auto_delete) - qname, mc, cc = self.channel.queue_declare(queue.name, durable=queue.durable, exclusive=queue.exclusive, diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index ac423a3..ef3338e 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -1,7 +1,7 @@ import itertools import Queue from coolamqp.backends import PyAMQPBackend -from .orders import SendMessage, ConsumeQueue, DeclareExchange +from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue from .messages import Exchange @@ -94,6 +94,19 @@ class Cluster(object): on_completed=on_completed, on_failed=on_failed)) + + def cancel(self, queue, on_completed=None, on_failed=None): + """ + Cancel consuming from a queue + + :param queue: Queue to consume from + :param on_completed: callable/0 to call when this succeeds + :param on_failed: callable/1 to call when this fails with AMQPError instance + """ + self.thread.order_queue.append(CancelQueue(queue, + on_completed=on_completed, + on_failed=on_failed)) + def consume(self, queue, on_completed=None, on_failed=None): """ Start consuming from a queue @@ -101,7 +114,7 @@ class Cluster(object): This queue will be declared to the broker. If this queue has any binds (.exchange field is not empty), queue will be binded to exchanges. - :param queue: Queue to consume from. + :param queue: Queue to consume from :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance """ @@ -109,7 +122,6 @@ class Cluster(object): on_completed=on_completed, on_failed=on_failed)) - def drain(self, wait=0): """ Return a ClusterEvent on what happened, or None if nothing could be obtained diff --git a/coolamqp/handler.py b/coolamqp/handler.py index f8c8266..ae141cb 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -107,7 +107,6 @@ class ClusterHandlerThread(threading.Thread): self.backend.basic_consume(order.queue) self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue - elif isinstance(order, CancelQueue): try: q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag) -- GitLab