From 51bfe4e11f8fe63671942b40392385bd126baeeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Fri, 23 Dec 2016 14:51:37 +0100 Subject: [PATCH] fixed #4 not really fixed, as it's a WONTFIX, but still relatively ok --- coolamqp/backends/pyamqp.py | 18 +++++++++++++----- coolamqp/handler.py | 2 +- requirements.txt | 3 ++- setup.cfg | 2 +- tests/test_basics.py | 4 ++-- 5 files changed, 19 insertions(+), 10 deletions(-) diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 1f0cd26..90bbbe3 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -1,10 +1,13 @@ -#coding=UTF-8 +# coding=UTF-8 """Backend using pyamqp""" +from __future__ import division import amqp import socket import functools import logging from .base import AMQPBackend, RemoteAMQPError, ConnectionFailedError +import monotonic + logger = logging.getLogger(__name__) @@ -37,6 +40,8 @@ class PyAMQPBackend(AMQPBackend): except AttributeError: pass # this does not always have to exist self.channel = self.connection.channel() + self.heartbeat = node.heartbeat or 0 + self.last_heartbeat_at = monotonic.monotonic() def shutdown(self): AMQPBackend.shutdown(self) @@ -50,11 +55,14 @@ class PyAMQPBackend(AMQPBackend): pass @translate_exceptions - def process(self, max_time=10): - self.connection.heartbeat_tick() + def process(self, max_time=1): try: - self.connection.drain_events(max_time) - except socket.timeout: + if self.heartbeat > 0: + if monotonic.monotonic() - self.last_heartbeat_at > (self.heartbeat / 2): + self.connection.heartbeat_tick(rate=self.heartbeat) + self.last_heartbeat_at = monotonic.monotonic() + self.connection.drain_events(max_time) + except socket.timeout as e: pass @translate_exceptions diff --git a/coolamqp/handler.py b/coolamqp/handler.py index e98dbc1..187f25c 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -158,7 +158,7 @@ class ClusterHandlerThread(threading.Thread): self.perform_order() # just drain shit - self.backend.process(max_time=1) + self.backend.process(max_time=0.05) except ConnectionFailedError as e: logger.warning('Connection to broker lost') self.cluster.connected = False diff --git a/requirements.txt b/requirements.txt index d929198..5fecbcd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ amqp -six \ No newline at end of file +six +monotonic diff --git a/setup.cfg b/setup.cfg index 5b368da..81af2bb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,4 +2,4 @@ description-file = README.md [pycodestyle] -max-line-length=120 \ No newline at end of file +max-line-length=120 diff --git a/tests/test_basics.py b/tests/test_basics.py index bbd387a..ed1e4e2 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -35,12 +35,12 @@ class TestBasics(unittest.TestCase): self.amqp.consume(myq) self.amqp.send(Message('what the fuck'), '', routing_key='myqueue') - p = self.amqp.drain(wait=4) + p = self.amqp.drain(wait=1) self.assertIsInstance(p, MessageReceived) self.assertEquals(p.message.body, 'what the fuck') p.message.ack() - self.assertIs(self.amqp.drain(wait=4), None) + self.assertIs(self.amqp.drain(wait=1), None) def test_nacknowledge(self): myq = Queue('myqueue', exclusive=True) -- GitLab