Skip to content
Snippets Groups Projects
Commit 1db5674e authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

attache group as an attache

parent dd020f82
No related branches found
No related tags found
No related merge requests found
...@@ -6,7 +6,10 @@ These duties almost always require allocating a channel. A base class - Channele ...@@ -6,7 +6,10 @@ These duties almost always require allocating a channel. A base class - Channele
The attache becomes then responsible for closing this channel. The attache becomes then responsible for closing this channel.
Attache should also register at least one on_fail watch, so it can handle things if they go south. Attache should also register at least one on_fail watch, so it can handle things if they go south.
Multiple attaches can be "abstracted" as single one via AttacheGroup (which is also an Attache)
""" """
from coolamqp.attaches.consumer import Consumer from coolamqp.attaches.consumer import Consumer
from coolamqp.attaches.publisher import Publisher from coolamqp.attaches.publisher import Publisher
from coolamqp.attaches.agroup import AttacheGroup
# coding=UTF-8
"""
This is an attache that attaches multiple attaches.
It evicts cancelled attaches.
"""
from __future__ import print_function, absolute_import, division
import six
import logging
import weakref
logger = logging.getLogger(__name__)
from coolamqp.attaches.channeler import Attache, ST_OFFLINE
from coolamqp.attaches.consumer import Consumer
class AttacheGroup(Attache):
"""
A bunch of attaches
"""
def __init__(self):
super(AttacheGroup, self).__init__()
self.attaches = []
def add(self, attache):
"""
Add an attache to this group.
If this is attached, and connection is ST_ONLINE, .attach() will be called
on this attache at once.
:param attache: Attache instance
"""
assert attache not in self.attaches
print('Adding %s' % (attache, ))
self.attaches.append(attache)
# If we have any connection, and it's not dead, attach
if self.connection is not None and self.connection.state != ST_OFFLINE:
print('Attach to me %s' % (attache, ))
attache.attach(self.connection)
if isinstance(attache, Consumer):
attache.attache_group = self
def on_cancel_customer(self, customer):
"""
Called by a customer, when it's cancelled.
Consumer must have .attache_group set to this. This is done by .add()
:param customer: a Customer instance
"""
self.attaches.remove(customer)
def attach(self, connection):
"""
Attach to a connection
:param connection: Connection instance of any state
"""
super(AttacheGroup, self).attach(connection)
for attache in self.attaches:
if not attache.cancelled:
if attache.connection != connection:
print('Attach to me %s' % (attache, ))
attache.attach(connection)
...@@ -47,6 +47,9 @@ class Consumer(Channeler): ...@@ -47,6 +47,9 @@ class Consumer(Channeler):
self.cancelled = False # did the client want to STOP using this consumer? self.cancelled = False # did the client want to STOP using this consumer?
self.receiver = None # MessageReceiver instance self.receiver = None # MessageReceiver instance
self.attache_group = None # attache group this belongs to.
# if this is not None, then it has an attribute
# on_cancel_customer(Consumer instance)
def cancel(self): def cancel(self):
""" """
...@@ -59,6 +62,9 @@ class Consumer(Channeler): ...@@ -59,6 +62,9 @@ class Consumer(Channeler):
""" """
self.cancelled = True self.cancelled = True
self.method(ChannelClose(0, b'consumer cancelled', 0, 0)) self.method(ChannelClose(0, b'consumer cancelled', 0, 0))
if self.attache_group is not None:
self.attache_group.on_cancel_customer(self)
def on_operational(self, operational): def on_operational(self, operational):
super(Consumer, self).on_operational(operational) super(Consumer, self).on_operational(operational)
...@@ -203,8 +209,6 @@ class Consumer(Channeler): ...@@ -203,8 +209,6 @@ class Consumer(Channeler):
class MessageReceiver(object): class MessageReceiver(object):
"""This is an object that is used to received messages. """This is an object that is used to received messages.
......
...@@ -118,6 +118,5 @@ def _compile_particular_content_property_list_class(zpf, fields): ...@@ -118,6 +118,5 @@ def _compile_particular_content_property_list_class(zpf, fields):
def compile_particular_content_property_list_class(zpf, fields): def compile_particular_content_property_list_class(zpf, fields):
q = _compile_particular_content_property_list_class(zpf, fields) q = _compile_particular_content_property_list_class(zpf, fields)
logger.debug('Compiling\n%s', q) logger.debug('Compiling\n%s', q)
print(q)
exec(q) exec(q)
return ParticularContentTypeList return ParticularContentTypeList
...@@ -5,13 +5,13 @@ import time, logging, threading ...@@ -5,13 +5,13 @@ import time, logging, threading
from coolamqp.objects import Message, MessageProperties, NodeDefinition from coolamqp.objects import Message, MessageProperties, NodeDefinition
from coolamqp.uplink import Connection from coolamqp.uplink import Connection
from coolamqp.attaches import Consumer, Publisher from coolamqp.attaches import Consumer, Publisher, AttacheGroup
from coolamqp.objects import Queue from coolamqp.objects import Queue
import time import time
NODE = NodeDefinition('127.0.0.1', 'user', 'user', heartbeat=20) NODE = NodeDefinition('127.0.0.1', 'user', 'user', heartbeat=20)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.INFO)
if __name__ == '__main__': if __name__ == '__main__':
lt = ListenerThread() lt = ListenerThread()
...@@ -20,29 +20,29 @@ if __name__ == '__main__': ...@@ -20,29 +20,29 @@ if __name__ == '__main__':
con = Connection(NODE, lt) con = Connection(NODE, lt)
con.start() con.start()
cons = Consumer(Queue('siema-eniu'), no_ack=True) ag = AttacheGroup()
cons.attach(con)
ag.add(Consumer(Queue('siema-eniu'), no_ack=True))
class IPublishThread(threading.Thread): class IPublishThread(threading.Thread):
def __init__(self): def __init__(self, ag):
super(IPublishThread, self).__init__() super(IPublishThread, self).__init__()
self.ag = ag
self.daemon = True self.daemon = True
def run(self): def run(self):
pub2 = Publisher(Publisher.MODE_NOACK) pub2 = Publisher(Publisher.MODE_NOACK)
pub2.attach(con) self.ag.add(pub2)
while True: while True:
pub2.publish(Message(b'you dawg', properties=MessageProperties(content_type='text/plain')), pub2.publish(Message(b'you dawg', properties=MessageProperties(content_type='text/plain')),
routing_key=b'siema-eniu') routing_key=b'siema-eniu')
ag.attach(con)
IPublishThread().start() IPublishThread(ag).start()
while True: while True:
time.sleep(30) time.sleep(30)
if not cons.cancelled:
cons.cancel()
lt.terminate() lt.terminate()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment