From 2f932fef19eca6fb6e54582ec616775ad2b396a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Thu, 9 Jul 2020 17:46:02 +0200 Subject: [PATCH] fix the tests --- coolamqp/clustering/cluster.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 43318ac..c1670f3 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -86,6 +86,9 @@ class Cluster(object): self.attache_group = None # type: AttacheGroup self.events = None # type: six.moves.queue.Queue self.snr = None # type: SingleNodeReconnector + self.pub_tr = None # type: Publisher + self.pub_na = None # type: Publisher + self.decl = None # type: Declarer if on_fail is not None: def decorated(): @@ -133,7 +136,7 @@ class Cluster(object): Return an Event. :param timeout: time to wait for an event. 0 means return immediately. None means block forever - :para span: optional parent span, if opentracing is installed + :param span: optional parent span, if opentracing is installed :return: an Event instance. NothingMuch is returned when there's nothing within a given timoeout """ @@ -162,7 +165,7 @@ class Cluster(object): return fetch() def consume(self, queue, on_message=None, span=None, *args, **kwargs): - # type: (Queue, tp.Callable[[MessageReceived], None] -> tp.Tuple[Consumer, Future] + # type: (Queue, tp.Callable[[MessageReceived], None]) -> tp.Tuple[Consumer, Future] """ Start consuming from a queue. @@ -185,7 +188,7 @@ class Cluster(object): fut = Future() fut.set_running_or_notify_cancel() # it's running right now on_message = on_message or ( - lambda rmsg: self.events.put_nowait(MessageReceived(rmsg))) + lambda msg: self.events.put_nowait(MessageReceived(msg))) con = Consumer(queue, on_message, future_to_notify=fut, span=span, *args, **kwargs) self.attache_group.add(con) @@ -261,10 +264,11 @@ class Cluster(object): tx = False try: - return (self.pub_tr if tx else self.pub_na).publish(message, - exchange, - routing_key, - span) + if tx: + clb = self.pub_tr + else: + clb = self.pub_na + return clb.publish(message, exchange, routing_key, span) except Publisher.UnusablePublisher: raise NotImplementedError( u'Sorry, this functionality is not yet implemented!') -- GitLab