diff --git a/coolamqp/attaches/__init__.py b/coolamqp/attaches/__init__.py index 9475c8e8cf4686ec316f993dbf301602018d76f8..06520e35eddd52923316c2b2fc674f967ebad076 100644 --- a/coolamqp/attaches/__init__.py +++ b/coolamqp/attaches/__init__.py @@ -2,10 +2,11 @@ from __future__ import absolute_import, division, print_function """ Attaches are components that attach to an coolamqp.uplink.Connection and perform some duties -These duties almost require allocating a channel. The attache becomes then responsible for closing this channel. +These duties almost always require allocating a channel. A base class - Channeler - is provided to faciliate that. +The attache becomes then responsible for closing this channel. Attache should also register at least one on_fail watch, so it can handle things if they go south. """ from coolamqp.attaches.consumer import Consumer -from coolamqp.attaches.publisher import Publisher, MODE_NOACK, MODE_CNPUB +from coolamqp.attaches.publisher import Publisher diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 453164a2bd7f75f83969c650e5c4da48538df871..5f08120e24c26d9507b67aed242680db4be34a64 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -31,9 +31,6 @@ from coolamqp.objects import Future logger = logging.getLogger(__name__) -MODE_NOACK = 0 -MODE_CNPUB = 1 # this will be available only if suitable extensions were used - # for holding messages when MODE_CNPUB and link is down CnpubMessageSendOrder = collections.namedtuple('CnpubMessageSendOrder', ('message', 'exchange_name', @@ -55,6 +52,9 @@ class Publisher(Channeler): Other modes may be added in the future. """ + MODE_NOACK = 0 # no-ack publishing + MODE_CNPUB = 1 # RabbitMQ publisher confirms extension + def __init__(self, mode): """ @@ -67,7 +67,7 @@ class Publisher(Channeler): :raise ValueError: mode invalid """ super(Publisher, self).__init__() - if mode not in (MODE_NOACK, MODE_CNPUB): + if mode not in (Publisher.MODE_NOACK, Publisher.MODE_CNPUB): raise ValueError(u'Invalid publisher mode') self.mode = mode @@ -112,7 +112,7 @@ class Publisher(Channeler): To be used when mode is MODE_CNPUB and we just got ST_ONLINE """ assert self.state == ST_ONLINE - assert self.mode == MODE_CNPUB + assert self.mode == Publisher.MODE_CNPUB while len(self.messages) > 0: msg, xchg, rk, fut = self.messages.popleft() @@ -129,7 +129,7 @@ class Publisher(Channeler): """ This gets called on BasicAck and BasicNack, if mode is MODE_CNPUB """ - assert self.mode == MODE_CNPUB + assert self.mode == Publisher.MODE_CNPUB print('Got %s with dt=%s' % (payload, payload.delivery_tag)) @@ -158,14 +158,14 @@ class Publisher(Channeler): :return: a Future instance, or None """ # Formulate the request - if self.mode == MODE_NOACK: + if self.mode == Publisher.MODE_NOACK: # If we are not connected right now, drop the message on the floor and log it with DEBUG if self.state != ST_ONLINE: logger.debug(u'Publish request, but not connected - dropping the message') else: self._pub(message, exchange_name, routing_key) - elif self.mode == MODE_CNPUB: + elif self.mode == Publisher.MODE_CNPUB: fut = Future() #todo can optimize this not to create an object if ST_ONLINE already @@ -182,7 +182,7 @@ class Publisher(Channeler): def on_setup(self, payload): # Assert that mode is OK - if self.mode == MODE_CNPUB: + if self.mode == Publisher.MODE_CNPUB: if PUBLISHER_CONFIRMS not in self.connection.extensions: warnings.warn(u'Broker does not support publisher_confirms, refusing to start publisher', RuntimeWarning) @@ -193,14 +193,14 @@ class Publisher(Channeler): # Ok, if this has a mode different from MODE_NOACK, we need to additionally set up # the functionality. - if self.mode == MODE_CNPUB: + if self.mode == Publisher.MODE_CNPUB: self.method_and_watch(ConfirmSelect(False), ConfirmSelectOk, self.on_setup) - elif self.mode == MODE_NOACK: + elif self.mode == Publisher.MODE_NOACK: # A-OK! Boot it. self.state = ST_ONLINE self.on_operational(True) - elif self.mode == MODE_CNPUB: + elif self.mode == Publisher.MODE_CNPUB: # Because only in this case it makes sense to check for MODE_CNPUB if isinstance(payload, ConfirmSelectOk): # A-OK! Boot it. diff --git a/tests/run.py b/tests/run.py index 6ad963bef03cf1542f565cc63baa863b6f77e517..247c23a3cd2e0f90c8b0456367823d29872d4925 100644 --- a/tests/run.py +++ b/tests/run.py @@ -5,7 +5,7 @@ import time, logging, threading from coolamqp.objects import Message, MessageProperties, NodeDefinition from coolamqp.uplink import Connection -from coolamqp.attaches import Consumer, Publisher, MODE_NOACK, MODE_CNPUB +from coolamqp.attaches import Consumer, Publisher from coolamqp.objects import Queue import time @@ -30,7 +30,7 @@ if __name__ == '__main__': self.daemon = True def run(self): - pub2 = Publisher(MODE_NOACK) + pub2 = Publisher(Publisher.MODE_NOACK) pub2.attach(con) while True: pub2.publish(Message(b'you dawg'), routing_key=b'siema-eniu')