diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 1f0cd26ddf3bd0cc2700daaa3b17db9e6932aaea..90bbbe3805d0a1277e66b6d7279587fa52116c34 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -1,10 +1,13 @@ -#coding=UTF-8 +# coding=UTF-8 """Backend using pyamqp""" +from __future__ import division import amqp import socket import functools import logging from .base import AMQPBackend, RemoteAMQPError, ConnectionFailedError +import monotonic + logger = logging.getLogger(__name__) @@ -37,6 +40,8 @@ class PyAMQPBackend(AMQPBackend): except AttributeError: pass # this does not always have to exist self.channel = self.connection.channel() + self.heartbeat = node.heartbeat or 0 + self.last_heartbeat_at = monotonic.monotonic() def shutdown(self): AMQPBackend.shutdown(self) @@ -50,11 +55,14 @@ class PyAMQPBackend(AMQPBackend): pass @translate_exceptions - def process(self, max_time=10): - self.connection.heartbeat_tick() + def process(self, max_time=1): try: - self.connection.drain_events(max_time) - except socket.timeout: + if self.heartbeat > 0: + if monotonic.monotonic() - self.last_heartbeat_at > (self.heartbeat / 2): + self.connection.heartbeat_tick(rate=self.heartbeat) + self.last_heartbeat_at = monotonic.monotonic() + self.connection.drain_events(max_time) + except socket.timeout as e: pass @translate_exceptions diff --git a/coolamqp/handler.py b/coolamqp/handler.py index e98dbc12bc0140ed0bdd02367677c6bcfa2a20c7..187f25cc9ea89fc0f81a2deb189db9c18b100a2c 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -158,7 +158,7 @@ class ClusterHandlerThread(threading.Thread): self.perform_order() # just drain shit - self.backend.process(max_time=1) + self.backend.process(max_time=0.05) except ConnectionFailedError as e: logger.warning('Connection to broker lost') self.cluster.connected = False diff --git a/requirements.txt b/requirements.txt index d929198bbc45e075ddb0462ea557b956c1fbfb29..5fecbcdb185943b6108d6983bf76d394d392c227 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ amqp -six \ No newline at end of file +six +monotonic diff --git a/setup.cfg b/setup.cfg index 5b368da59afe67383812313007206ff3129174fb..81af2bb8d76f960564e3394a03f6f2b6c290ffe9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,4 +2,4 @@ description-file = README.md [pycodestyle] -max-line-length=120 \ No newline at end of file +max-line-length=120 diff --git a/tests/test_basics.py b/tests/test_basics.py index bbd387a6173b0a207bbdf42c7e7699481bd7a785..ed1e4e21ea5fa18b11e31f9958edbb138eafe037 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -35,12 +35,12 @@ class TestBasics(unittest.TestCase): self.amqp.consume(myq) self.amqp.send(Message('what the fuck'), '', routing_key='myqueue') - p = self.amqp.drain(wait=4) + p = self.amqp.drain(wait=1) self.assertIsInstance(p, MessageReceived) self.assertEquals(p.message.body, 'what the fuck') p.message.ack() - self.assertIs(self.amqp.drain(wait=4), None) + self.assertIs(self.amqp.drain(wait=1), None) def test_nacknowledge(self): myq = Queue('myqueue', exclusive=True)