diff --git a/coolamqp/attaches/__init__.py b/coolamqp/attaches/__init__.py index 06520e35eddd52923316c2b2fc674f967ebad076..9af36c4ba750393136ccd92ff4feb6df1cb22a9b 100644 --- a/coolamqp/attaches/__init__.py +++ b/coolamqp/attaches/__init__.py @@ -6,7 +6,10 @@ These duties almost always require allocating a channel. A base class - Channele The attache becomes then responsible for closing this channel. Attache should also register at least one on_fail watch, so it can handle things if they go south. + +Multiple attaches can be "abstracted" as single one via AttacheGroup (which is also an Attache) """ from coolamqp.attaches.consumer import Consumer from coolamqp.attaches.publisher import Publisher +from coolamqp.attaches.agroup import AttacheGroup diff --git a/coolamqp/attaches/agroup.py b/coolamqp/attaches/agroup.py new file mode 100644 index 0000000000000000000000000000000000000000..28c498329614a41cccfa462620234d8727dfa121 --- /dev/null +++ b/coolamqp/attaches/agroup.py @@ -0,0 +1,73 @@ +# coding=UTF-8 +""" +This is an attache that attaches multiple attaches. + +It evicts cancelled attaches. +""" +from __future__ import print_function, absolute_import, division +import six +import logging +import weakref + +logger = logging.getLogger(__name__) + + +from coolamqp.attaches.channeler import Attache, ST_OFFLINE +from coolamqp.attaches.consumer import Consumer + + +class AttacheGroup(Attache): + """ + A bunch of attaches + """ + + def __init__(self): + super(AttacheGroup, self).__init__() + self.attaches = [] + + def add(self, attache): + """ + Add an attache to this group. + + If this is attached, and connection is ST_ONLINE, .attach() will be called + on this attache at once. + + :param attache: Attache instance + """ + assert attache not in self.attaches + print('Adding %s' % (attache, )) + self.attaches.append(attache) + + # If we have any connection, and it's not dead, attach + if self.connection is not None and self.connection.state != ST_OFFLINE: + print('Attach to me %s' % (attache, )) + attache.attach(self.connection) + + if isinstance(attache, Consumer): + attache.attache_group = self + + def on_cancel_customer(self, customer): + """ + Called by a customer, when it's cancelled. + + Consumer must have .attache_group set to this. This is done by .add() + + :param customer: a Customer instance + """ + self.attaches.remove(customer) + + def attach(self, connection): + """ + Attach to a connection + + :param connection: Connection instance of any state + """ + super(AttacheGroup, self).attach(connection) + + for attache in self.attaches: + if not attache.cancelled: + if attache.connection != connection: + print('Attach to me %s' % (attache, )) + attache.attach(connection) + + diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 8c75efcc9e238758e21a72418a4a44c4965e1cc8..763bc1344104e28bb3145896ec663ed5943006b6 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -47,6 +47,9 @@ class Consumer(Channeler): self.cancelled = False # did the client want to STOP using this consumer? self.receiver = None # MessageReceiver instance + self.attache_group = None # attache group this belongs to. + # if this is not None, then it has an attribute + # on_cancel_customer(Consumer instance) def cancel(self): """ @@ -59,6 +62,9 @@ class Consumer(Channeler): """ self.cancelled = True self.method(ChannelClose(0, b'consumer cancelled', 0, 0)) + if self.attache_group is not None: + self.attache_group.on_cancel_customer(self) + def on_operational(self, operational): super(Consumer, self).on_operational(operational) @@ -203,8 +209,6 @@ class Consumer(Channeler): - - class MessageReceiver(object): """This is an object that is used to received messages. diff --git a/coolamqp/framing/compilation/content_property.py b/coolamqp/framing/compilation/content_property.py index 74b4776be0cc8f272e628b0431ba94510302f831..73732e443079dd9fa83f59b1d7679ea3f2ef1c36 100644 --- a/coolamqp/framing/compilation/content_property.py +++ b/coolamqp/framing/compilation/content_property.py @@ -118,6 +118,5 @@ def _compile_particular_content_property_list_class(zpf, fields): def compile_particular_content_property_list_class(zpf, fields): q = _compile_particular_content_property_list_class(zpf, fields) logger.debug('Compiling\n%s', q) - print(q) exec(q) return ParticularContentTypeList diff --git a/tests/run.py b/tests/run.py index 522e190813a13148d5a8245c38b61c4c8f4f6772..9ed4a25c513d6ca085e764a7c3a5bf29426408f6 100644 --- a/tests/run.py +++ b/tests/run.py @@ -5,13 +5,13 @@ import time, logging, threading from coolamqp.objects import Message, MessageProperties, NodeDefinition from coolamqp.uplink import Connection -from coolamqp.attaches import Consumer, Publisher +from coolamqp.attaches import Consumer, Publisher, AttacheGroup from coolamqp.objects import Queue import time NODE = NodeDefinition('127.0.0.1', 'user', 'user', heartbeat=20) -logging.basicConfig(level=logging.DEBUG) +logging.basicConfig(level=logging.INFO) if __name__ == '__main__': lt = ListenerThread() @@ -20,29 +20,29 @@ if __name__ == '__main__': con = Connection(NODE, lt) con.start() - cons = Consumer(Queue('siema-eniu'), no_ack=True) - cons.attach(con) + ag = AttacheGroup() + + ag.add(Consumer(Queue('siema-eniu'), no_ack=True)) class IPublishThread(threading.Thread): - def __init__(self): + def __init__(self, ag): super(IPublishThread, self).__init__() + self.ag = ag self.daemon = True def run(self): pub2 = Publisher(Publisher.MODE_NOACK) - pub2.attach(con) + self.ag.add(pub2) while True: pub2.publish(Message(b'you dawg', properties=MessageProperties(content_type='text/plain')), routing_key=b'siema-eniu') + ag.attach(con) - IPublishThread().start() + IPublishThread(ag).start() while True: time.sleep(30) - if not cons.cancelled: - cons.cancel() - lt.terminate()