Skip to content
Snippets Groups Projects
Commit bd9cabd1 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

consumers can be cancelled

parent 860356f7
No related branches found
No related tags found
No related merge requests found
from .pyamqp import PyAMQPBackend, AMQPError, RemoteAMQPError, ConnectionFailedError
\ No newline at end of file
from .pyamqp import PyAMQPBackend
from .base import AMQPError, ConnectionFailedError, RemoteAMQPError
\ No newline at end of file
......@@ -2,7 +2,7 @@
import amqp
import socket
import functools
from .base import AMQPBackend, AMQPError, RemoteAMQPError, ConnectionFailedError
from .base import AMQPBackend, RemoteAMQPError, ConnectionFailedError
def translate_exceptions(fun):
......@@ -53,7 +53,7 @@ class PyAMQPBackend(AMQPBackend):
@translate_exceptions
def basic_cancel(self, consumer_tag):
self.amqp_channel.basic_cancel(consumer_tag)
self.channel.basic_cancel(consumer_tag)
@translate_exceptions
def basic_publish(self, message, exchange, routing_key):
......@@ -91,8 +91,6 @@ class PyAMQPBackend(AMQPBackend):
if queue.anonymous:
queue.name = ''
print 'declaring queue that is %s %s %s %s' % (queue.name, queue.durable, queue.exclusive, queue.auto_delete)
qname, mc, cc = self.channel.queue_declare(queue.name,
durable=queue.durable,
exclusive=queue.exclusive,
......
import itertools
import Queue
from coolamqp.backends import PyAMQPBackend
from .orders import SendMessage, ConsumeQueue, DeclareExchange
from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue
from .messages import Exchange
......@@ -94,6 +94,19 @@ class Cluster(object):
on_completed=on_completed,
on_failed=on_failed))
def cancel(self, queue, on_completed=None, on_failed=None):
"""
Cancel consuming from a queue
:param queue: Queue to consume from
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
"""
self.thread.order_queue.append(CancelQueue(queue,
on_completed=on_completed,
on_failed=on_failed))
def consume(self, queue, on_completed=None, on_failed=None):
"""
Start consuming from a queue
......@@ -101,7 +114,7 @@ class Cluster(object):
This queue will be declared to the broker. If this queue has any binds
(.exchange field is not empty), queue will be binded to exchanges.
:param queue: Queue to consume from.
:param queue: Queue to consume from
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
"""
......@@ -109,7 +122,6 @@ class Cluster(object):
on_completed=on_completed,
on_failed=on_failed))
def drain(self, wait=0):
"""
Return a ClusterEvent on what happened, or None if nothing could be obtained
......
......@@ -107,7 +107,6 @@ class ClusterHandlerThread(threading.Thread):
self.backend.basic_consume(order.queue)
self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue
elif isinstance(order, CancelQueue):
try:
q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment