diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py index 31c5b38e3f4ecbce88e840a2bc9df611709232e3..d588907fe39bcc2ba005519b8da003e17c692c2d 100644 --- a/coolamqp/backends/__init__.py +++ b/coolamqp/backends/__init__.py @@ -1 +1,2 @@ -from .pyamqp import PyAMQPBackend, AMQPError, RemoteAMQPError, ConnectionFailedError \ No newline at end of file +from .pyamqp import PyAMQPBackend +from .base import AMQPError, ConnectionFailedError, RemoteAMQPError \ No newline at end of file diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 1f9de7b5e509199cf9db8ca1b38b5083c85463cc..4a520fd5877818be0c07da9ef93ce8f2e699b9dc 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -2,7 +2,7 @@ import amqp import socket import functools -from .base import AMQPBackend, AMQPError, RemoteAMQPError, ConnectionFailedError +from .base import AMQPBackend, RemoteAMQPError, ConnectionFailedError def translate_exceptions(fun): @@ -53,7 +53,7 @@ class PyAMQPBackend(AMQPBackend): @translate_exceptions def basic_cancel(self, consumer_tag): - self.amqp_channel.basic_cancel(consumer_tag) + self.channel.basic_cancel(consumer_tag) @translate_exceptions def basic_publish(self, message, exchange, routing_key): @@ -91,8 +91,6 @@ class PyAMQPBackend(AMQPBackend): if queue.anonymous: queue.name = '' - print 'declaring queue that is %s %s %s %s' % (queue.name, queue.durable, queue.exclusive, queue.auto_delete) - qname, mc, cc = self.channel.queue_declare(queue.name, durable=queue.durable, exclusive=queue.exclusive, diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index ac423a33f457f677b9d248e4a71db479ef15e0b3..ef3338e09c2c9a894556e744e934050db4ed0e0d 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -1,7 +1,7 @@ import itertools import Queue from coolamqp.backends import PyAMQPBackend -from .orders import SendMessage, ConsumeQueue, DeclareExchange +from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue from .messages import Exchange @@ -94,6 +94,19 @@ class Cluster(object): on_completed=on_completed, on_failed=on_failed)) + + def cancel(self, queue, on_completed=None, on_failed=None): + """ + Cancel consuming from a queue + + :param queue: Queue to consume from + :param on_completed: callable/0 to call when this succeeds + :param on_failed: callable/1 to call when this fails with AMQPError instance + """ + self.thread.order_queue.append(CancelQueue(queue, + on_completed=on_completed, + on_failed=on_failed)) + def consume(self, queue, on_completed=None, on_failed=None): """ Start consuming from a queue @@ -101,7 +114,7 @@ class Cluster(object): This queue will be declared to the broker. If this queue has any binds (.exchange field is not empty), queue will be binded to exchanges. - :param queue: Queue to consume from. + :param queue: Queue to consume from :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance """ @@ -109,7 +122,6 @@ class Cluster(object): on_completed=on_completed, on_failed=on_failed)) - def drain(self, wait=0): """ Return a ClusterEvent on what happened, or None if nothing could be obtained diff --git a/coolamqp/handler.py b/coolamqp/handler.py index f8c8266cbd9153a61cad795a6ecda56a71a3e1f1..ae141cb77ac8f85177be26855d182ad63b571370 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -107,7 +107,6 @@ class ClusterHandlerThread(threading.Thread): self.backend.basic_consume(order.queue) self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue - elif isinstance(order, CancelQueue): try: q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag)