diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py
index 43318acb0dc9eac81eabd54353bac494978b228f..c1670f303fd154af998a1a725be7e8ab8b5be0de 100644
--- a/coolamqp/clustering/cluster.py
+++ b/coolamqp/clustering/cluster.py
@@ -86,6 +86,9 @@ class Cluster(object):
         self.attache_group = None       # type: AttacheGroup
         self.events = None              # type: six.moves.queue.Queue
         self.snr = None                 # type: SingleNodeReconnector
+        self.pub_tr = None              # type: Publisher
+        self.pub_na = None              # type: Publisher
+        self.decl = None                # type: Declarer
 
         if on_fail is not None:
             def decorated():
@@ -133,7 +136,7 @@ class Cluster(object):
         Return an Event.
 
         :param timeout: time to wait for an event. 0 means return immediately. None means block forever
-        :para span: optional parent span, if opentracing is installed
+        :param span: optional parent span, if opentracing is installed
         :return: an Event instance. NothingMuch is returned when there's nothing within a given timoeout
         """
 
@@ -162,7 +165,7 @@ class Cluster(object):
             return fetch()
 
     def consume(self, queue, on_message=None, span=None, *args, **kwargs):
-        # type: (Queue, tp.Callable[[MessageReceived], None] -> tp.Tuple[Consumer, Future]
+        # type: (Queue, tp.Callable[[MessageReceived], None]) -> tp.Tuple[Consumer, Future]
         """
         Start consuming from a queue.
 
@@ -185,7 +188,7 @@ class Cluster(object):
         fut = Future()
         fut.set_running_or_notify_cancel()  # it's running right now
         on_message = on_message or (
-            lambda rmsg: self.events.put_nowait(MessageReceived(rmsg)))
+            lambda msg: self.events.put_nowait(MessageReceived(msg)))
         con = Consumer(queue, on_message, future_to_notify=fut, span=span, *args,
                        **kwargs)
         self.attache_group.add(con)
@@ -261,10 +264,11 @@ class Cluster(object):
             tx = False
 
         try:
-            return (self.pub_tr if tx else self.pub_na).publish(message,
-                                                                exchange,
-                                                                routing_key,
-                                                                span)
+            if tx:
+                clb = self.pub_tr
+            else:
+                clb = self.pub_na
+            return clb.publish(message, exchange, routing_key, span)
         except Publisher.UnusablePublisher:
             raise NotImplementedError(
                 u'Sorry, this functionality is not yet implemented!')