From a311a29257bcfd9bae45ae52fc4d8292eb283995 Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Wed, 11 Jan 2017 12:45:25 +0100 Subject: [PATCH] fixed #14, anonymous queue bug fixed --- coolamqp/attaches/consumer.py | 2 +- coolamqp/clustering/cluster.py | 42 ++++++++++++++++++------------ coolamqp/uplink/listener/thread.py | 5 +++- tests/test_clustering/test_a.py | 9 ++++++- 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index f0da62e..fdc33b4 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -285,7 +285,7 @@ class Consumer(Channeler): elif isinstance(payload, QueueDeclareOk): # did we need an anonymous name? if self.queue.anonymous: - self.queue.name = payload.queue_name.tobytes() + self.queue.name = payload.queue.tobytes() # We need any form of binding. if self.queue.exchange is not None: diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 1834eb8..e44608a 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -48,24 +48,8 @@ class Cluster(object): if len(nodes) > 1: raise NotImplementedError(u'Multiple nodes not supported yet') - self.listener = ListenerThread() self.node, = nodes - self.attache_group = AttacheGroup() - - self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* - - self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) - self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) - - # Spawn a transactional publisher and a noack publisher - self.pub_tr = Publisher(Publisher.MODE_CNPUB) - self.pub_na = Publisher(Publisher.MODE_NOACK) - self.decl = Declarer() - - self.attache_group.add(self.pub_tr) - self.attache_group.add(self.pub_na) - self.attache_group.add(self.decl) def declare(self, obj, persistent=False): """ @@ -142,9 +126,33 @@ class Cluster(object): def start(self, wait=True): """ - Connect to broker. + Connect to broker. Initialize Cluster. + + Only after this call is Cluster usable. + It is not safe to fork after this. + :param wait: block until connection is ready """ + self.listener = ListenerThread() + + self.attache_group = AttacheGroup() + + self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* + + self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) + self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) + + # Spawn a transactional publisher and a noack publisher + self.pub_tr = Publisher(Publisher.MODE_CNPUB) + self.pub_na = Publisher(Publisher.MODE_NOACK) + self.decl = Declarer() + + self.attache_group.add(self.pub_tr) + self.attache_group.add(self.pub_na) + self.attache_group.add(self.decl) + + + self.listener.init() self.listener.start() self.snr.connect() diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index 887ecba..013bd23 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -21,8 +21,11 @@ class ListenerThread(threading.Thread): def terminate(self): self.terminating = True - def run(self): + def init(self): + """Called before start. It is not safe to fork after this""" self.listener = EpollListener() + + def run(self): while not self.terminating: self.listener.wait(timeout=1) self.listener.shutdown() diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 67ebff9..6a96449 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -6,7 +6,7 @@ from __future__ import print_function, absolute_import, division import six import unittest import time, logging, threading -from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage +from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage, Exchange from coolamqp.clustering import Cluster, MessageReceived, NothingMuch import time @@ -45,6 +45,13 @@ class TestA(unittest.TestCase): self.assertEquals(con.qos, (0, 110)) + def test_anonymq(self): + q = Queue(exchange=Exchange(u'ooo', type=b'fanout', auto_delete=True), auto_delete=True) + + c, f = self.c.consume(q) + + f.result() + def test_send_recv_zerolen(self): P = {'q': False} -- GitLab