diff --git a/coolamqp/events.py b/coolamqp/events.py deleted file mode 100644 index 0dbe85087adab5dc48779423a53bafe8842bb5e6..0000000000000000000000000000000000000000 --- a/coolamqp/events.py +++ /dev/null @@ -1,53 +0,0 @@ -# coding=UTF-8 -""" -Events emitted by Cluster -""" - - -class ClusterEvent(object): - """Base class for events emitted by cluster""" - - -class ConnectionDown(ClusterEvent): - """Connection to broker has been broken""" - - -class ConnectionUp(ClusterEvent): - """Connection to broker has been (re)established""" - - def __init__(self, initial=False): - self.initial = initial #: public, is this first connection up in this cluster ever? - - -class MessageReceived(ClusterEvent): - """A message has been received from the broker""" - def __init__(self, message): - """ - :param message: ReceivedMessage instance - """ - self.message = message - - -class ConsumerCancelled(ClusterEvent): - """ - Broker cancelled a consumer of ours. - This is also generated in response to cancelling consumption from a queue - """ - - BROKER_CANCEL = 0 - REFUSED_ON_RECONNECT = 1 - USER_CANCEL = 2 - - def __init__(self, queue, reason): - """ - :param queue: Queue whose consumer was cancelled - :param reason: Reason why the consumer was cancelled - ConsumerCancelled.BROKER_CANCEL - broker informed us about cancelling - ConsumerCancelled.REFUSED_ON_RECONNECT - during a reconnect, I tried to consume an exclusive - queue and got ACCESS_REFUSED. - These messages will arrive between ConnectionDown and - ConnectionUp. - ConsumedCancelled.USER_CANCEL - user called cluster.cancel() - """ - self.queue = queue - self.reason = reason diff --git a/coolamqp/orders.py b/coolamqp/orders.py deleted file mode 100644 index 7375569ed16127918e573cf1204aa1befb062305..0000000000000000000000000000000000000000 --- a/coolamqp/orders.py +++ /dev/null @@ -1,175 +0,0 @@ -# coding=UTF-8 -""" -Orders that can be dispatched to ClusterHandlerThread -""" -from threading import Lock -import warnings - - -_NOOP_COMP = lambda: None -_NOOP_FAIL = lambda e: None - - -class Order(object): - """Base class for orders dispatched to ClusterHandlerThread""" - def __init__(self, on_completed=None, on_failed=None): - """ - Please note that callbacks will be executed BEFORE the lock is released, - but after .result is updated, ie. if - you have something like - - amqp.send(.., on_completed=hello).result() - bye() - - then hello() will be called BEFORE bye(). - Callbacks are called from CoolAMQP's internal thread. - - If this fails, then property .error_code can be read to get the error code. - and .reply_text has the reply of the server or some other reason. These are set before - callbacks are called. - - Error code is None, if not available, or AMQP constants describing errors, - eg. 502 for syntax error. - - A discarded or cancelled order is considered FAILED - """ - self.on_completed = on_completed or _NOOP_COMP - self.on_failed = on_failed or _NOOP_FAIL - self._result = None # None on non-completed - # True on completed OK - # exception instance on failed - # private - self.lock = Lock() - self.lock.acquire() - self.cancelled = False #: public - self.discarded = False #: public - self.error_code = None - self.reply_text = None - - def has_finished(self): - """Return if this task has either completed or failed""" - return self._result is not None - - def cancel(self): - """Cancel this order""" - self.cancelled = True - - def _completed(self): # called by handler - self._result = True - self.on_completed() - self.lock.release() - - def _discard(self): # called by handler - from coolamqp.backends.base import Discarded - self.discarded = True - self.on_failed(Discarded()) - self.lock.release() - - def _failed(self, e): # called by handler - """ - :param e: AMQPError instance or Cancelled instance - """ - from coolamqp.backends import Cancelled - self._result = e - if not isinstance(e, Cancelled): # a true error - self.error_code = e.code - self.reply_text = e.reply_text - - self.on_failed(e) - self.lock.release() - - def wait(self): - """Wait until this is completed and return whether the order succeeded""" - self.lock.acquire() - return self._result is True - - def has_failed(self): - """Return whether the operation failed, ie. completed but with an error code. - Cancelled and discarded ops are considered failed. - This assumes that this order has been .wait()ed upon""" - return self._result is not True - - def result(self): - """Wait until this is completed and return a response""" - warnings.warn('Use .wait() instead', PendingDeprecationWarning) - self.lock.acquire() - return self._result - - @staticmethod - def _discarded(on_completed=None, on_failed=None): # return order for a discarded message - o = Order(on_completed=on_completed, on_failed=on_failed) - self.on_completed() - - -class SendMessage(Order): - """Send a message""" - def __init__(self, message, exchange, routing_key, discard_on_fail=False, on_completed=None, on_failed=None): - Order.__init__(self, on_completed=on_completed, on_failed=on_failed) - self.message = message - self.exchange = exchange - self.discard_on_fail = discard_on_fail - self.routing_key = routing_key - - -class _Exchange(Order): - """Things with exchanges""" - def __init__(self, exchange, on_completed=None, on_failed=None): - Order.__init__(self, on_completed=on_completed, on_failed=on_failed) - self.exchange = exchange - - -class DeclareExchange(_Exchange): - """Declare an exchange""" - - -class DeleteExchange(_Exchange): - """Delete an exchange""" - - -class _Queue(Order): - """Things with queues""" - def __init__(self, queue, on_completed=None, on_failed=None): - Order.__init__(self, on_completed=on_completed, on_failed=on_failed) - self.queue = queue - - -class DeclareQueue(_Queue): - """Declare a a queue""" - - -class ConsumeQueue(_Queue): - """Declare and consume from a queue""" - def __init__(self, queue, no_ack=False, on_completed=None, on_failed=None): - _Queue.__init__(self, queue, on_completed=on_completed, on_failed=on_failed) - self.no_ack = no_ack - - -class DeleteQueue(_Queue): - """Delete a queue""" - - -class CancelQueue(_Queue): - """Cancel consuming from a queue""" - - -class SetQoS(Order): - """Set QoS""" - def __init__(self, prefetch_window, prefetch_count, global_, on_completed=None, on_failed=None): - Order.__init__(self, on_completed=on_completed, on_failed=on_failed) - self.qos = prefetch_window, prefetch_count, global_ -1 - -class _AcksAndNacks(Order): - """related to acking and nacking""" - def __init__(self, connect_id, delivery_tag, on_completed): - Order.__init__(self, on_completed=on_completed) - self.connect_id = connect_id - self.delivery_tag = delivery_tag - - -class AcknowledgeMessage(_AcksAndNacks): - """ACK a message""" - - -class NAcknowledgeMessage(_AcksAndNacks): - """NACK a message""" diff --git a/examples/send_to_myself.py b/examples/send_to_myself.py deleted file mode 100644 index bc113e54c71fc7be96d3bf035a84a63ee5c5bdb9..0000000000000000000000000000000000000000 --- a/examples/send_to_myself.py +++ /dev/null @@ -1,36 +0,0 @@ -# coding=UTF-8 -from __future__ import print_function -from coolamqp import Cluster, ClusterNode, Queue, Message, ConnectionUp, ConnectionDown, MessageReceived, ConsumerCancelled -import logging -import time - -QUEUE_NAME = 'f' - -logging.basicConfig() - -cluster = Cluster([ClusterNode('192.168.224.31:5672', 'smok', 'smok', 'smok', heartbeat=10)]).start() - -a_queue = Queue(QUEUE_NAME, auto_delete=True) -cluster.consume(a_queue) -cluster.qos(0, 1) - - -q = time.time() -while True: - if time.time() - q > 10: - q = time.time() - cluster.send(Message('hello world'), routing_key=QUEUE_NAME) - - evt = cluster.drain(2) - - if isinstance(evt, ConnectionUp): - print('Connection is up') - elif isinstance(evt, ConnectionDown): - print('Connection is down') - elif isinstance(evt, MessageReceived): - print('Message is %s' % (evt.message.body, )) - evt.message.ack() - elif isinstance(evt, ConsumerCancelled): - print('Consumer %s cancelled' % (evt.queue.name, )) - - diff --git a/tests/test_basics.py b/tests/test_basics.py deleted file mode 100644 index b0051c672514ac182a59d92607b706d6382d9a39..0000000000000000000000000000000000000000 --- a/tests/test_basics.py +++ /dev/null @@ -1,139 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import six - -from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConsumerCancelled, Message, Exchange - -from tests.utils import getamqp, CoolAMQPTestCase - -class TestThings(CoolAMQPTestCase): - INIT_AMQP = False - - def test_different_constructor_for_clusternode(self): - cn = ClusterNode(host='127.0.0.1', user='guest', password='guest', virtual_host='/') - amqp = Cluster([cn]) - amqp.start() - self.assertIsInstance(amqp.drain(1), ConnectionUp) - amqp.shutdown() - - -#todo discard on fail needs tests - -class TestBasics(CoolAMQPTestCase): - - def test_acknowledge(self): - myq = Queue('myqueue', exclusive=True) - - self.amqp.consume(myq) - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - - p = self.drainTo(MessageReceived, 4) - self.assertEquals(p.message.body, b'what the fuck') - self.assertIsInstance(p.message.body, six.binary_type) - p.message.ack() - - self.assertIs(self.amqp.drain(wait=1), None) - - def test_send_bullshit(self): - self.assertRaises(TypeError, lambda: Message(u'what the fuck')) - - def test_send_nonobvious_bullshit(self): - self.assertEquals(Message(bytearray(b'what the fuck')).body, b'what the fuck') - - def test_nacknowledge(self): - myq = Queue('myqueue', exclusive=True) - - self.amqp.consume(myq) - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - - p = self.drainTo(MessageReceived, 4) - self.assertEquals(p.message.body, b'what the fuck') - p.message.nack() - - p = self.drainTo(MessageReceived, 4) - self.assertEquals(p.message.body, b'what the fuck') - - def test_bug_hangs(self): - p = Queue('lol', exclusive=True) - self.amqp.consume(p) - self.amqp.consume(p).result() - - def test_consume_declare(self): - """Spawn a second connection. One declares an exclusive queue, other tries to consume from it""" - with self.new_amqp_connection() as amqp2: - - has_failed = {'has_failed': False} - - self.amqp.declare_queue(Queue('lol', exclusive=True)).result() - amqp2.consume(Queue('lol', exclusive=True), on_failed=lambda e: has_failed.update({'has_failed': True})).result() - - self.assertTrue(has_failed['has_failed']) - - def test_qos(self): - self.amqp.qos(0, 1) - - self.amqp.consume(Queue('lol', exclusive=True)).result() - self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - - p = self.drainTo(MessageReceived, 4) - - self.drainToNone(5) - p.message.ack() - self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) - - def test_consume_twice(self): - """Spawn a second connection and try to consume an exclusive queue twice""" - with self.new_amqp_connection() as amqp2: - - has_failed = {'has_failed': False} - - self.amqp.consume(Queue('lol', exclusive=True)).result() - amqp2.consume(Queue('lol', exclusive=True), on_failed=lambda e: has_failed.update({'has_failed': True})).result() - - self.assertTrue(has_failed['has_failed']) - - def test_send_and_receive(self): - myq = Queue('myqueue', exclusive=True) - - self.amqp.consume(myq) - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - - self.assertEquals(self.drainTo(MessageReceived, 3).message.body, b'what the fuck') - - def test_consumer_cancelled_on_queue_deletion(self): - myq = Queue('myqueue', exclusive=True) - - self.amqp.consume(myq) - self.amqp.delete_queue(myq) - - self.assertEquals(self.drainTo(ConsumerCancelled, 5).reason, ConsumerCancelled.BROKER_CANCEL) - - def test_consumer_cancelled_on_consumer_cancel(self): - myq = Queue('myqueue', exclusive=True) - - self.amqp.consume(myq) - self.amqp.cancel(myq) - - c = self.drainTo(ConsumerCancelled, 10) - self.assertEquals(c.reason, ConsumerCancelled.USER_CANCEL) - - def test_delete_exchange(self): - xchg = Exchange('a_fanout', type='fanout') - self.amqp.declare_exchange(xchg) - self.amqp.delete_exchange(xchg).result() - - def test_exchanges(self): - xchg = Exchange('a_fanout', type='fanout') - self.amqp.declare_exchange(xchg) - - q1 = Queue('q1', exclusive=True, exchange=xchg) - q2 = Queue('q2', exclusive=True, exchange=xchg) - - self.amqp.consume(q1) - self.amqp.consume(q2) - - self.amqp.send(Message(b'hello'), xchg) - - self.drainTo(MessageReceived, 4) - self.drainTo(MessageReceived, 4) diff --git a/tests/test_failures.py b/tests/test_failures.py deleted file mode 100644 index c658026279a85db9039718dbb51a76173ff1cbf1..0000000000000000000000000000000000000000 --- a/tests/test_failures.py +++ /dev/null @@ -1,146 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function - -import unittest -import time -from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, \ - ConnectionDown, ConsumerCancelled, Message, Exchange - - -NODE = ClusterNode('127.0.0.1', 'guest', 'guest') - -from tests.utils import CoolAMQPTestCase - - -class TestSpecialCases(CoolAMQPTestCase): - INIT_AMQP = False - - def test_termination_while_disconnect(self): - self.amqp = Cluster([NODE]) - self.amqp.start() - self.assertIsInstance(self.amqp.drain(wait=1), ConnectionUp) - - self.fail_amqp() - time.sleep(5) - self.assertIsInstance(self.amqp.drain(wait=1), ConnectionDown) - - self.amqp.shutdown() - self.assertIsNone(self.amqp.thread.backend) - self.assertFalse(self.amqp.connected) - - self.unfail_amqp() - - -class TestFailures(CoolAMQPTestCase): - - def test_cancel_not_consumed_queue(self): - self.amqp.cancel(Queue('hello world')).result() - - def test_longer_disconnects(self): - self.fail_amqp() - time.sleep(3) - self.drainTo(ConnectionDown, 4) - time.sleep(12) - self.unfail_amqp() - self.drainTo(ConnectionUp, 35) - - def test_qos_redeclared_on_fail(self): - self.amqp.qos(0, 1).result() - - self.restart_rmq() - - self.amqp.consume(Queue('lol', exclusive=True)).result() - self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - - p = self.drainTo(MessageReceived, 4) - - self.drainToNone(5) - p.message.ack() - self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) - - def test_connection_flags_are_okay(self): - self.fail_amqp() - self.drainTo(ConnectionDown, 8) - self.assertFalse(self.amqp.connected) - self.unfail_amqp() - self.drainTo(ConnectionUp, 5) - self.assertTrue(self.amqp.connected) - - def test_sending_a_message_is_cancelled(self): - """are messages generated at all? does it reconnect?""" - - self.amqp.consume(Queue('wtf1', exclusive=True)) - - self.fail_amqp() - self.drainTo(ConnectionDown, 5) - - p = self.amqp.send(Message(b'what the fuck'), routing_key='wtf1') - p.cancel() - self.assertTrue(p.wait()) - self.assertFalse(p.has_failed()) - - self.fail_unamqp() - self.drainToAny([ConnectionUp], 30, forbidden=[MessageReceived]) - - def test_qos_after_failure(self): - self.amqp.qos(0, 1) - - self.amqp.consume(Queue('lol', exclusive=True)).result() - self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - - p = self.drainTo(MessageReceived, 4) - - self.assertIsNone(self.amqp.drain(wait=5)) - p.message.ack() - self.drainTo(MessageReceived, 4) - - self.restart_rmq() - - self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - self.amqp.send(Message(b'what the fuck'), '', routing_key='lol') - - p = self.drainTo(MessageReceived, 4) - - self.drainToNone(5) - p.message.ack() - self.drainTo(MessageReceived, 4) - - def test_connection_down_and_up_redeclare_queues(self): - """are messages generated at all? does it reconnect?""" - q1 = Queue('wtf1', exclusive=True, auto_delete=True) - self.amqp.consume(q1).result() - - self.restart_rmq() - - self.amqp.send(Message(b'what the fuck'), routing_key='wtf1') - - self.drainTo(MessageReceived, 10) - - def test_exchanges_are_redeclared(self): - xchg = Exchange('a_fanout', type='fanout') - self.amqp.declare_exchange(xchg) - - q1 = Queue('q1', exclusive=True, exchange=xchg) - q2 = Queue('q2', exclusive=True, exchange=xchg) - - self.amqp.consume(q1) - self.amqp.consume(q2).result() - - self.restart_rmq() - - self.amqp.send(Message(b'hello'), xchg) - self.drainTo([MessageReceived, MessageReceived], 20) - - def test_consuming_exclusive_queue(self): - # declare and eat - q1 = Queue('q1', exclusive=True) - - self.amqp.consume(q1).wait() - - with self.new_amqp_connection() as amqp2: - q2 = Queue('q1', exclusive=True) - - r = amqp2.consume(q2) - self.assertFalse(r.wait()) diff --git a/tests/test_noack.py b/tests/test_noack.py deleted file mode 100644 index 2720947bb85057fbbaf9dd90a755232438e682d8..0000000000000000000000000000000000000000 --- a/tests/test_noack.py +++ /dev/null @@ -1,109 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import six -import unittest -import time - -from tests.utils import CoolAMQPTestCase -from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConnectionDown, ConsumerCancelled, Message, Exchange - -class TestNoAcknowledge(CoolAMQPTestCase): - def test_noack_works(self): - myq = Queue('myqueue', exclusive=True) - - self.amqp.qos(0, 1, False) - - self.amqp.consume(myq, no_ack=True) - - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - - self.drainTo([MessageReceived, MessageReceived, MessageReceived], [3, 3, 3]) - - def test_noack_works_after_restart(self): - myq = Queue('myqueue', exclusive=True) - - self.amqp.qos(0, 1, False) - - self.amqp.consume(myq, no_ack=True) - - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue') - - self.drainTo([MessageReceived, MessageReceived, MessageReceived], [3, 3, 3]) - - self.restart_rmq() - - self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') - self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') - self.amqp.send(Message(b'what the fuck'), routing_key='myqueue') - - self.drainTo([MessageReceived, MessageReceived, MessageReceived], [3, 3, 3]) - - def test_noack_coexists(self): - self.amqp.qos(0, 1, False) - - self.amqp.consume(Queue('myqueue', exclusive=True), no_ack=True) - self.amqp.consume(Queue('myqueue2', exclusive=True)) - - msg = Message(b'zz') - - for i in range(3): - self.amqp.send(msg, routing_key='myqueue') - self.amqp.send(msg, routing_key='myqueue2') - - mq2s = [] - for i in range(4): - # I should have received 3 messages from myqueue, and 2 from myqueue2 - print('beng') - mer = self.drainTo(MessageReceived) - if mer.message.routing_key == 'myqueue2': - mq2s.append(mer.message) - - # Should receive nothing, since not acked - self.assertIsNone(self.amqp.drain(wait=4)) - - self.assertEquals(len(mq2s), 1) - - # ack and receive - for me in mq2s: me.ack() - self.drainTo(MessageReceived, 1).message.ack() # 2nd - self.drainTo(MessageReceived, 1).message.ack() # 3rd - - @unittest.skip('demonstrates a py-amqp bug') - def test_noack_coexists_empty_message_body(self): - self.amqp.qos(0, 1, False) - - self.amqp.consume(Queue('myqueue', exclusive=True), no_ack=True) - self.amqp.consume(Queue('myqueue2', exclusive=True)) - - msg = Message(b'') # if this is empty, py-amqp fails - - for i in range(3): - self.amqp.send(msg, routing_key='myqueue') - self.amqp.send(msg, routing_key='myqueue2') - - # And here connection with the broker snaps ..... - - mq2s = [] - for i in range(4): - # I should have received 3 messages from myqueue, and 2 from myqueue2 - mer = self.drainTo(MessageReceived) - if mer.message.routing_key == 'myqueue2': - mq2s.append(mer.message) - - # Should receive nothing, since not acked - self.assertIsNone(self.amqp.drain(wait=4)) - - self.assertEquals(len(mq2s), 1) - - # ack and receive - for me in mq2s: me.ack() - mer = self.amqp.drain(wait=1) # 2nd - self.assertIsInstance(mer, MessageReceived) - mer.message.ack() - mer = self.amqp.drain(wait=1) # 3rd - self.assertIsInstance(mer, MessageReceived) - mer.message.ack() \ No newline at end of file diff --git a/tests/test_performance.py b/tests/test_performance.py deleted file mode 100644 index d5bc651ccd7b8f0c9380f450ec0e783111cadc8b..0000000000000000000000000000000000000000 --- a/tests/test_performance.py +++ /dev/null @@ -1,24 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -from tests.utils import CoolAMQPTestCase -import six -import time - -from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, \ - ConnectionDown, ConsumerCancelled, Message, Exchange - - -class TestBasics(CoolAMQPTestCase): - def setUp(self): - self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) - self.amqp.start() - self.assertIsInstance(self.amqp.drain(1), ConnectionUp) - - def tearDown(self): - self.amqp.shutdown() - - def test_sending_a_message(self): - - with self.takes_less_than(0.5): - self.amqp.send(Message(b''), routing_key='nowhere').result() -