diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index f0da62eead8850d1f49476df4cf57e6bc230454a..fdc33b46e0a1c2c461155b26eea0f4a865f35c78 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -285,7 +285,7 @@ class Consumer(Channeler): elif isinstance(payload, QueueDeclareOk): # did we need an anonymous name? if self.queue.anonymous: - self.queue.name = payload.queue_name.tobytes() + self.queue.name = payload.queue.tobytes() # We need any form of binding. if self.queue.exchange is not None: diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 1834eb838c959905e5ce839abf906fe917f76f51..e44608a639e6e9a0f5c25dbf2bde33aa2eead585 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -48,24 +48,8 @@ class Cluster(object): if len(nodes) > 1: raise NotImplementedError(u'Multiple nodes not supported yet') - self.listener = ListenerThread() self.node, = nodes - self.attache_group = AttacheGroup() - - self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* - - self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) - self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) - - # Spawn a transactional publisher and a noack publisher - self.pub_tr = Publisher(Publisher.MODE_CNPUB) - self.pub_na = Publisher(Publisher.MODE_NOACK) - self.decl = Declarer() - - self.attache_group.add(self.pub_tr) - self.attache_group.add(self.pub_na) - self.attache_group.add(self.decl) def declare(self, obj, persistent=False): """ @@ -142,9 +126,33 @@ class Cluster(object): def start(self, wait=True): """ - Connect to broker. + Connect to broker. Initialize Cluster. + + Only after this call is Cluster usable. + It is not safe to fork after this. + :param wait: block until connection is ready """ + self.listener = ListenerThread() + + self.attache_group = AttacheGroup() + + self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* + + self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) + self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) + + # Spawn a transactional publisher and a noack publisher + self.pub_tr = Publisher(Publisher.MODE_CNPUB) + self.pub_na = Publisher(Publisher.MODE_NOACK) + self.decl = Declarer() + + self.attache_group.add(self.pub_tr) + self.attache_group.add(self.pub_na) + self.attache_group.add(self.decl) + + + self.listener.init() self.listener.start() self.snr.connect() diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index 887ecba5c533b48ee8b98705b6d50ddd9ec37fae..013bd23db0d3fbb168accd51bb7ccc1c574a7202 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -21,8 +21,11 @@ class ListenerThread(threading.Thread): def terminate(self): self.terminating = True - def run(self): + def init(self): + """Called before start. It is not safe to fork after this""" self.listener = EpollListener() + + def run(self): while not self.terminating: self.listener.wait(timeout=1) self.listener.shutdown() diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 67ebff937b88bea4b93acd7049a713a7a75af6fb..6a96449e043205dcb9e75476c4249a1125291407 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -6,7 +6,7 @@ from __future__ import print_function, absolute_import, division import six import unittest import time, logging, threading -from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage +from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage, Exchange from coolamqp.clustering import Cluster, MessageReceived, NothingMuch import time @@ -45,6 +45,13 @@ class TestA(unittest.TestCase): self.assertEquals(con.qos, (0, 110)) + def test_anonymq(self): + q = Queue(exchange=Exchange(u'ooo', type=b'fanout', auto_delete=True), auto_delete=True) + + c, f = self.c.consume(q) + + f.result() + def test_send_recv_zerolen(self): P = {'q': False}