From cecf58404831e4ed390c112dd9c890878d0740e9 Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Sun, 24 Dec 2017 06:24:53 +0100 Subject: [PATCH] fixes #23 --- coolamqp/attaches/publisher.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 0b4d7c0..3d75b9d 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -151,6 +151,7 @@ class Publisher(Channeler, Synchronized): """ assert self.state == ST_ONLINE assert self.mode == Publisher.MODE_CNPUB + assert self.tagger is not None while len(self.messages) > 0: try: @@ -265,20 +266,17 @@ class Publisher(Channeler, Synchronized): self.state = ST_ONLINE self.on_operational(True) - elif self.mode == Publisher.MODE_CNPUB: + elif (self.mode == Publisher.MODE_CNPUB) and isinstance(payload, ConfirmSelectOk): # Because only in this case it makes sense to check for MODE_CNPUB - - if isinstance(payload, ConfirmSelectOk): - # A-OK! Boot it. - self.state = ST_ONLINE - self.on_operational(True) - - self.tagger = AtomicTagger() - - # now we need to listen for BasicAck and BasicNack - - mw = MethodWatch(self.channel_id, (BasicAck, BasicNack), - self._on_cnpub_delivery) - mw.oneshot = False - self.connection.watch(mw) - self._mode_cnpub_process_deliveries() + # A-OK! Boot it. + self.tagger = AtomicTagger() + self.state = ST_ONLINE + self.on_operational(True) + + # now we need to listen for BasicAck and BasicNack + + mw = MethodWatch(self.channel_id, (BasicAck, BasicNack), + self._on_cnpub_delivery) + mw.oneshot = False + self.connection.watch(mw) + self._mode_cnpub_process_deliveries() -- GitLab