From 3225a685b24d45fa24141303ee570a80fa99550c Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Thu, 12 Jan 2017 15:01:53 +0100 Subject: [PATCH] bugs and things --- coolamqp/attaches/consumer.py | 6 +++--- coolamqp/attaches/declarer.py | 5 ++++- coolamqp/uplink/listener/epoll_listener.py | 2 ++ coolamqp/uplink/listener/socket.py | 2 +- tests/run.py | 4 ++-- tests/test_clustering/test_a.py | 9 ++++++++- tests/test_clustering/test_exchanges.py | 10 ++++------ 7 files changed, 24 insertions(+), 14 deletions(-) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index fdc33b4..c8eec69 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -231,11 +231,11 @@ class Consumer(Channeler): :param sth: AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or AMQPBodyFrame """ if isinstance(sth, BasicDeliver): - self.receiver.on_basic_deliver(sth) + self.receiver.on_basic_deliver(sth) elif isinstance(sth, AMQPBodyFrame): - self.receiver.on_body(sth.data) + self.receiver.on_body(sth.data) elif isinstance(sth, AMQPHeaderFrame): - self.receiver.on_head(sth) + self.receiver.on_head(sth) # No point in listening for more stuff, that's all the watches even listen for diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index 54ecee7..0977269 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -147,8 +147,11 @@ class Declarer(Channeler, Synchronized): Future is returned, so that user knows when it happens. - Declaring is not fast, because there is at most one declare at wire, but at least we know WHAT failed. + Exchange declarations never fail. + Of course they do, but you will be told that it succeeded. This is by design, + and due to how AMQP works. + Queue declarations CAN fail. Note that if re-declaring these fails, they will be silently discarded. You can subscribe an on_discard(Exchange | Queue) here. diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index fa67e0b..727076b 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -86,8 +86,10 @@ class EpollListener(object): sock.on_read() if event & select.EPOLLOUT: + if sock.on_write(): # I'm done with sending for now + assert len(sock.data_to_send) == 0 and len(sock.priority_queue) == 0 self.epoll.modify(sock.fileno(), RW) except SocketFailed: diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 9684243..a24d0c1 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -105,7 +105,7 @@ class BaseSocket(object): :raises SocketFailed: on socket error :return: True if I'm done sending shit for now """ - if self.is_failed: return + if self.is_failed: return False while True: if len(self.data_to_send) == 0: diff --git a/tests/run.py b/tests/run.py index 9aefc07..0d79aa5 100644 --- a/tests/run.py +++ b/tests/run.py @@ -15,8 +15,8 @@ if __name__ == '__main__': amqp = Cluster([NODE]) amqp.start(wait=True) - a = Exchange(u'jola', type='fanout', auto_delete=True, durable=False) - bad = Exchange(u'jola', type='direct', auto_delete=True, durable=True) + a = Exchange(u'jolax', type='fanout', auto_delete=True) + bad = Exchange(u'jolax', type='direct', auto_delete=True) amqp.declare(a).result() diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 6a96449..ad88e73 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -5,7 +5,7 @@ Test things from __future__ import print_function, absolute_import, division import six import unittest -import time, logging, threading +import time, logging, threading, monotonic from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage, Exchange from coolamqp.clustering import Cluster, MessageReceived, NothingMuch @@ -30,6 +30,13 @@ class TestA(unittest.TestCase): fut.result() con.cancel() + def test_actually_waits(self): + a = monotonic.monotonic() + + self.c.drain(5) + + self.assertTrue(monotonic.monotonic() - a >= 4) + def test_set_qos_but_later(self): con, fut = self.c.consume(Queue(u'hello', exclusive=True)) diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py index 39cf110..a070d2b 100644 --- a/tests/test_clustering/test_exchanges.py +++ b/tests/test_clustering/test_exchanges.py @@ -22,13 +22,11 @@ class TestExchanges(unittest.TestCase): self.c.shutdown() def test_declare_exchange(self): - a = Exchange(u'jola', type=b'fanout', auto_delete=True) - bad = Exchange(u'jola', type=b'topic', auto_delete=True) + a = Exchange(u'jolax', type=b'fanout', auto_delete=True) + bad = Exchange(u'jolax', type=b'topic', auto_delete=True) - self.c.declare(a) - - self.assertRaises(AMQPError, lambda: self.c.declare(bad).result()) - self.assertRaises(AMQPError, lambda: self.c.declare(bad).result()) + self.c.declare(a).result() + self.c.declare(bad).result() # succeeds nevertheless def test_fanout(self): x = Exchange(u'jola', type='direct', auto_delete=True) -- GitLab