diff --git a/CHANGELOG.md b/CHANGELOG.md index 1333119f3740919c6970d993a28d585377c9fb97..2e17f43d709b05d96f6dfa05c57cd459575523de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ v2.0.0 * declare will expect qos to be given as an integer, and will be set as prefetch_count, since RabbitMQ no longer supports prefetch_size * same can be said of Consumer.set_qos(prefetch_count) + * added Cluster. v1.5.0 ====== diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index eea1f2ccd08b95151b2b5a52d76cf40bd56f170b..b47818152e7419642511527243cf2a1b0084632e 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -192,9 +192,6 @@ class Declarer(Channeler, Synchronized): Channeler.__init__(self) Synchronized.__init__(self) self.cluster = cluster - self.declared = set() # since Queues and Exchanges are hashable... - # anonymous queues aren't, but we reject those - # persistent self.left_to_declare = collections.deque() # since last disconnect. persistent+transient # deque of Operation objects @@ -219,10 +216,6 @@ class Declarer(Channeler, Synchronized): while len(self.left_to_declare) > 0: self.left_to_declare.pop().on_connection_dead() - # recast current declarations as new operations - for dec in self.declared: - self.left_to_declare.append(Operation(self, dec)) - super(Declarer, self).on_close() return @@ -267,7 +260,7 @@ class Declarer(Channeler, Synchronized): return fut - def declare(self, obj, persistent=False, span=None): + def declare(self, obj, span=None): """ Schedule to have an object declared. @@ -280,11 +273,7 @@ class Declarer(Channeler, Synchronized): Queue declarations CAN fail. - Note that if re-declaring these fails, they will be silently discarded. - You can subscribe an on_discard(Exchange | Queue) here. - :param obj: Exchange or Queue instance - :param persistent: will be redeclared upon disconnect. To remove, use "undeclare" :param span: span if opentracing is installed :return: a Future instance :raise ValueError: tried to declare anonymous queue @@ -298,10 +287,6 @@ class Declarer(Channeler, Synchronized): fut = Future() fut.set_running_or_notify_cancel() - if persistent: - if obj not in self.declared: - self.declared.add(obj) - self.left_to_declare.append(Operation(self, obj, fut, span, enqueued_span)) self._do_operations() diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 3be5d873691ac33ca01de620f8564e2b51c87f11..e62fbb3ef90b309a692462beacdfe9f44bb383d2 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -89,6 +89,7 @@ class Cluster(object): self.pub_tr = None # type: Publisher self.pub_na = None # type: Publisher self.decl = None # type: Declarer + self.on_fail = None if on_fail is not None: def decorated(): @@ -96,10 +97,8 @@ class Cluster(object): on_fail() self.on_fail = decorated - else: - self.on_fail = None - def bind(self, queue, exchange, routing_key, persistent=False, span=None, + def bind(self, queue, exchange, routing_key, span=None, dont_trace=False, arguments=None): """ Bind a queue to an exchange @@ -114,12 +113,10 @@ class Cluster(object): else: child_span = None fut = self.decl.declare(QueueBind(queue, exchange, routing_key, arguments), - persistent=persistent, span=child_span) return close_future(fut, child_span) def declare(self, obj, # type: tp.Union[Queue, Exchange] - persistent=False, # type: bool span=None, # type: tp.Optional[opentracing.Span] dont_trace=False # type: bool ): # type: (...) -> concurrent.futures.Future @@ -132,7 +129,6 @@ class Cluster(object): AMQP error 404: NOT_FOUND, so try to declare your exchanges before your queues. :param obj: Queue/Exchange object - :param persistent: should it be redefined upon reconnect? :param span: optional parent span, if opentracing is installed :param dont_trace: if True, a span won't be output :return: Future @@ -144,7 +140,7 @@ class Cluster(object): child_span = self._make_span('declare', span) else: child_span = None - fut = self.decl.declare(obj, persistent=persistent, span=child_span) + fut = self.decl.declare(obj, span=child_span) return close_future(fut, child_span) def drain(self, timeout, span=None, dont_trace=False): # type: (float) -> Event @@ -371,3 +367,9 @@ class Cluster(object): self.listener.terminate() if wait: self.listener.join() + + def is_shutdown(self): + """ + :return: bool, if this was started and later disconnected. + """ + return self.started and not self.connected diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index 1c856da672982b3ddd0f2604da30caa6ea5a7e32..634574a8fa14b2aae61dc3de900bc9554d7268e7 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -75,7 +75,6 @@ class SingleNodeReconnector(object): return self.connection = None - self.listener_thread.call_next_io_event(self.connect) def shutdown(self): """Close this connection""" diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index 69424de8796b5e2df3c73572bd6f004271522543..be8ec00dcdc0029385e66552259d36e62d15df5e 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -54,18 +54,6 @@ class ListenerThread(threading.Thread): self._call_next_io_event = Callable(oneshots=True) self.listener = None # type: BaseListener - def call_next_io_event(self, callable): - """ - Call callable after current I/O event is fully processed - - sometimes many callables are called in response to single - I/O (eg. teardown, startup). This guarantees a call after - all these are done. - :param callable: callable/0 - """ - pass -# self._call_next_io_event.add(callable) - dummy that out, causes AssertionError to appear - def terminate(self): self.terminating = True diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index b4588db299cf0b316b6b3df664f9791fb79b58e7..42ab856e7435494e75420a7cb2fa226fe11b9422 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -26,7 +26,9 @@ class TestA(unittest.TestCase): self.c.start(timeout=20) def tearDown(self): + self.assertFalse(self.c.is_shutdown()) self.c.shutdown() + self.assertTrue(self.c.is_shutdown()) def test_properties(self): self.assertEqual(self.c.properties.properties['product'], 'RabbitMQ')