diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index fdc33b46e0a1c2c461155b26eea0f4a865f35c78..c8eec69a36b3ad9e35e325c9ed91f79fa3849687 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -231,11 +231,11 @@ class Consumer(Channeler): :param sth: AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or AMQPBodyFrame """ if isinstance(sth, BasicDeliver): - self.receiver.on_basic_deliver(sth) + self.receiver.on_basic_deliver(sth) elif isinstance(sth, AMQPBodyFrame): - self.receiver.on_body(sth.data) + self.receiver.on_body(sth.data) elif isinstance(sth, AMQPHeaderFrame): - self.receiver.on_head(sth) + self.receiver.on_head(sth) # No point in listening for more stuff, that's all the watches even listen for diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index 54ecee778d07dd9dff86c618587ba996bc0952c3..097726953d647d7aef036198a905913217605c2e 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -147,8 +147,11 @@ class Declarer(Channeler, Synchronized): Future is returned, so that user knows when it happens. - Declaring is not fast, because there is at most one declare at wire, but at least we know WHAT failed. + Exchange declarations never fail. + Of course they do, but you will be told that it succeeded. This is by design, + and due to how AMQP works. + Queue declarations CAN fail. Note that if re-declaring these fails, they will be silently discarded. You can subscribe an on_discard(Exchange | Queue) here. diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index fa67e0bee10d96bb4ec46b778c974fc23ee64bd7..727076bc1e9acc3770b7c972326ec1c8de5c02ff 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -86,8 +86,10 @@ class EpollListener(object): sock.on_read() if event & select.EPOLLOUT: + if sock.on_write(): # I'm done with sending for now + assert len(sock.data_to_send) == 0 and len(sock.priority_queue) == 0 self.epoll.modify(sock.fileno(), RW) except SocketFailed: diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 9684243ab9ceeae7d4a35a20ed7f43f7219f055d..a24d0c101b8f9fa52dc9f654dbccda222e63c6af 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -105,7 +105,7 @@ class BaseSocket(object): :raises SocketFailed: on socket error :return: True if I'm done sending shit for now """ - if self.is_failed: return + if self.is_failed: return False while True: if len(self.data_to_send) == 0: diff --git a/tests/run.py b/tests/run.py index 9aefc071be7e3c4859d13dd2f2c7bae1822d467e..0d79aa59f2228a5e0f0042b743ee0db1ea7b1290 100644 --- a/tests/run.py +++ b/tests/run.py @@ -15,8 +15,8 @@ if __name__ == '__main__': amqp = Cluster([NODE]) amqp.start(wait=True) - a = Exchange(u'jola', type='fanout', auto_delete=True, durable=False) - bad = Exchange(u'jola', type='direct', auto_delete=True, durable=True) + a = Exchange(u'jolax', type='fanout', auto_delete=True) + bad = Exchange(u'jolax', type='direct', auto_delete=True) amqp.declare(a).result() diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 6a96449e043205dcb9e75476c4249a1125291407..ad88e738986fc7159f021e0e48ededb241f3b004 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -5,7 +5,7 @@ Test things from __future__ import print_function, absolute_import, division import six import unittest -import time, logging, threading +import time, logging, threading, monotonic from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage, Exchange from coolamqp.clustering import Cluster, MessageReceived, NothingMuch @@ -30,6 +30,13 @@ class TestA(unittest.TestCase): fut.result() con.cancel() + def test_actually_waits(self): + a = monotonic.monotonic() + + self.c.drain(5) + + self.assertTrue(monotonic.monotonic() - a >= 4) + def test_set_qos_but_later(self): con, fut = self.c.consume(Queue(u'hello', exclusive=True)) diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py index 39cf11014c235d79fbd82e87c4ac427eed761bfd..a070d2bd94d82e6d7777cc3b39eaff916e063fc2 100644 --- a/tests/test_clustering/test_exchanges.py +++ b/tests/test_clustering/test_exchanges.py @@ -22,13 +22,11 @@ class TestExchanges(unittest.TestCase): self.c.shutdown() def test_declare_exchange(self): - a = Exchange(u'jola', type=b'fanout', auto_delete=True) - bad = Exchange(u'jola', type=b'topic', auto_delete=True) + a = Exchange(u'jolax', type=b'fanout', auto_delete=True) + bad = Exchange(u'jolax', type=b'topic', auto_delete=True) - self.c.declare(a) - - self.assertRaises(AMQPError, lambda: self.c.declare(bad).result()) - self.assertRaises(AMQPError, lambda: self.c.declare(bad).result()) + self.c.declare(a).result() + self.c.declare(bad).result() # succeeds nevertheless def test_fanout(self): x = Exchange(u'jola', type='direct', auto_delete=True)