diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 30849434deaba71175cf145b62a982a2674572b9..a892b751dac87184c7eeaada419bf44238cd6732 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -11,13 +11,15 @@ from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, BasicConsum QueueBind, QueueBindOk, ChannelClose, ChannelCloseOk, BasicCancel, BasicDeliver, \ BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch - +import logging ST_OFFLINE = 0 # Consumer is *not* consuming, no setup attempts are being made ST_SYNCING = 1 # A process targeted at consuming has been started ST_ONLINE = 2 # Consumer is declared all right +logger = logging.getLogger(__name__) + class Attache(object): """ @@ -136,7 +138,10 @@ class Channeler(Attache): self.connection = None self.channel_id = None - print(self, 'pwned') + print(self, 'pwned with', payload) + + if isinstance(payload, ChannelClose): + logger.debug('Channel closed: %s %s', payload.reply_code, payload.reply_text) def methods(self, payloads): """ diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 71b3d5114462bd691ea0ab05e2282c0233b5268a..bee5f12ed7151017bcb632110380f3b682d60e66 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -1,14 +1,19 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function import six +import logging from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ QueueBind, QueueBindOk, ChannelClose, BasicCancel, BasicDeliver, \ - BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk, BasicQos + BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, HARD_ERROR from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE +from coolamqp.exceptions import ResourceLocked, AMQPError + + +logger = logging.getLogger(__name__) class Consumer(Channeler): @@ -28,23 +33,41 @@ class Consumer(Channeler): Since this implies cancelling the consumer, here you go. """ - def __init__(self, queue, no_ack=True, qos=None, dont_pause=False, - future_to_notify=None + def __init__(self, queue, on_message, no_ack=True, qos=None, cancel_on_failure=False, + future_to_notify=None, + fail_on_first_time_resource_locked=False ): """ - :param state: state of the consumer :param queue: Queue object, being consumed from right now. Note that name of anonymous queue might change at any time! + :param on_message: callable that will process incoming messages + :type on_message: callable(ReceivedMessage instance) :param no_ack: Will this consumer require acknowledges from messages? :param qos: a tuple of (prefetch size, prefetch window) for this consumer :type qos: tuple(int, int) or tuple(None, int) - :param dont_pause: Consumer will fail on the spot instead of pausing + :param cancel_on_failure: Consumer will cancel itself when link goes down + :type cancel_on_failure: bool + :param future_to_notify: Future to succeed when this consumer goes online for the first time. + This future can also raise with: + AMQPError - a HARD_ERROR (see AMQP spec) was encountered + ResourceLocked - this was the first declaration, and + fail_on_first_time_resource_locked was set + :param fail_on_first_time_resource_locked: When consumer is declared for the first time, + and RESOURCE_LOCKED is encountered, it will fail the + future with ResourceLocked, and consumer will cancel itself. + By default it will retry until success is made. + If the consumer doesn't get the chance to be declared - because + of a connection fail - next reconnect will consider this to be + SECOND declaration, ie. it will retry ad infinitum + :type fail_on_first_time_resource_locked: bool """ super(Consumer, self).__init__() self.queue = queue self.no_ack = no_ack + self.on_message = on_message + # private self.cancelled = False # did the client want to STOP using this consumer? self.receiver = None # MessageReceiver instance @@ -58,6 +81,10 @@ class Consumer(Channeler): self.qos = qos self.qos_update_sent = False # QoS was not sent to server + self.future_to_notify = future_to_notify + self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked + self.cancel_on_failure = cancel_on_failure + def set_qos(self, prefetch_size, prefetch_count): """ Set new QoS for this consumer. @@ -90,6 +117,12 @@ class Consumer(Channeler): if operational: assert self.receiver is None self.receiver = MessageReceiver(self) + + # notify the future + if self.future_to_notify is not None: + self.future_to_notify.set_result() + self.future_to_notify = None + else: self.receiver.on_gone() self.receiver = None @@ -106,6 +139,11 @@ class Consumer(Channeler): Note, this can be called multiple times, and eventually with None. """ + + if self.cancel_on_failure: + logger.debug('Consumer is cancel_on_failure and failure seen, cancelling') + self.cancel() + if self.state == ST_ONLINE: # The channel has just lost operationality! self.on_operational(False) @@ -124,17 +162,40 @@ class Consumer(Channeler): return if isinstance(payload, ChannelClose): - if payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED): + rc = payload.reply_code + if rc == RESOURCE_LOCKED: + # special handling + # This is because we might be reconnecting, and the broker doesn't know yet that we are dead. + # it won't release our exclusive channels, and that's why we'll get RESOURCE_LOCKED. + + if self.fail_on_first_time_resource_locked: + # still, a RESOURCE_LOCKED on a first declaration ever suggests something is very wrong + if self.future_to_notify: + self.future_to_notify.set_exception(ResourceLocked(payload)) + self.future_to_notify = None + self.cancel() + should_retry = True + elif rc in HARD_ERROR: + logger.warn('Channel closed due to hard error, %s: %s', payload.reply_code, payload.reply_text) + if self.future_to_notify: + self.future_to_notify.set_exception(AMQPError(payload)) + self.future_to_notify = None + # We might not want to throw the connection away. should_retry = should_retry and (not self.cancelled) - super(Consumer, self).on_close(payload) + old_con = self.connection + super(Consumer, self).on_close(payload) # this None's self.connection + self.fail_on_first_time_resource_locked = False - #todo retry on access denied + if should_retry: + if old_con.state == ST_ONLINE: + logger.info('Retrying with %s', self.queue.name) + self.attach(old_con) def on_delivery(self, sth): """ @@ -234,8 +295,8 @@ class Consumer(Channeler): self.on_operational(True) # resend QoS, in case of sth - self.set_qos(self.qos[0], self.qos[1]) - + if self.qos is not None: + self.set_qos(self.qos[0], self.qos[1]) class MessageReceiver(object): @@ -336,12 +397,7 @@ class MessageReceiver(object): None if self.consumer.no_ack else self.confirm(self.bdeliver.delivery_tag, False), ) -# print('hello seal - %s\nctype: %s\ncencod: %s\n' % (rm.body, -# rm.properties.__dict__.get('content_type', b'<EMPTY>'), -# rm.properties.__dict__.get('content_encoding', b'<EMPTY>'))) - - if ack_expected: - rm.ack() + self.consumer.on_message(rm) self.state = 0 diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 28ec95ff0a0aa5785917758bdb32d6d44c45de28..147df0fc5705a95d3700f42fafbc1da8879bb7d1 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -59,9 +59,12 @@ class Publisher(Channeler, Synchronized): """ MODE_NOACK = 0 # no-ack publishing MODE_CNPUB = 1 # RabbitMQ publisher confirms extension - #todo add fallback using plain AMQP transactions + #todo add fallback using plain AMQP transactions - this will remove UnusablePublisher and stuff + class UnusablePublisher(Exception): + """This publisher will never work (eg. MODE_CNPUB on a broker not supporting publisher confirms)""" + def __init__(self, mode): """ Create a new publisher @@ -85,6 +88,8 @@ class Publisher(Channeler, Synchronized): self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB + self.critically_failed = False + @Synchronized.synchronized def attach(self, connection): Channeler.attach(self, connection) @@ -177,6 +182,7 @@ class Publisher(Channeler, Synchronized): :param exchange_name: exchange name to use. Default direct exchange by default :param routing_key: routing key to use :return: a Future instance, or None + :raise Publisher.UnusablePublisher: this publisher will never work (eg. MODE_CNPUB on Non-RabbitMQ) """ # Formulate the request if self.mode == Publisher.MODE_NOACK: @@ -208,6 +214,7 @@ class Publisher(Channeler, Synchronized): warnings.warn(u'Broker does not support publisher_confirms, refusing to start publisher', RuntimeWarning) self.state = ST_OFFLINE + self.critically_failed = True return if isinstance(payload, ChannelOpenOk): diff --git a/coolamqp/clustering/__init__.py b/coolamqp/clustering/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..337b30fa20106d995704f7210238884b0fe5c5a8 --- /dev/null +++ b/coolamqp/clustering/__init__.py @@ -0,0 +1,13 @@ +# coding=UTF-8 +""" +This is the layer that you talk to. It abstracts away one (in future - more) connections +to broker with an uniform interface. +""" +from __future__ import print_function, absolute_import, division + +import logging + +logger = logging.getLogger(__name__) + + +from coolamqp.clustering.cluster import Cluster diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py new file mode 100644 index 0000000000000000000000000000000000000000..9d170b17679ab9a694c4078b3b73d60422a55e93 --- /dev/null +++ b/coolamqp/clustering/cluster.py @@ -0,0 +1,131 @@ +# coding=UTF-8 +""" +THE object you interface with +""" +from __future__ import print_function, absolute_import, division +import six +import logging +import warnings +import time +from coolamqp.uplink import ListenerThread +from coolamqp.clustering.single import SingleNodeReconnector +from coolamqp.attaches import Publisher, AttacheGroup, Consumer +from coolamqp.objects import Future, Exchange +from six.moves.queue import Queue + + +logger = logging.getLogger(__name__) + + +class Cluster(object): + """ + Frontend for your AMQP needs. + + This has ListenerThread. + + Call .start() to connect to AMQP. + """ + + # Events you can be informed about + ST_LINK_LOST = 0 # Link has been lost + ST_LINK_REGAINED = 1 # Link has been regained + + + def __init__(self, nodes): + """ + :param nodes: list of nodes, or a single node. For now, only one is supported. + :type nodes: NodeDefinition instance or a list of NodeDefinition instances + """ + from coolamqp.objects import NodeDefinition + if isinstance(nodes, NodeDefinition): + nodes = [nodes] + + if len(nodes) > 1: + raise NotImplementedError(u'Multiple nodes not supported yet') + + self.listener = ListenerThread() + self.node, = nodes + + self.attache_group = AttacheGroup() + + self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) + + # Spawn a transactional publisher and a noack publisher + self.pub_tr = Publisher(Publisher.MODE_CNPUB) + self.pub_na = Publisher(Publisher.MODE_NOACK) + + self.attache_group.add(self.pub_tr) + self.attache_group.add(self.pub_na) + + self.events = Queue() # for + + def consume(self, queue, on_message=None, *args, **kwargs): + """ + Start consuming from a queue. + + args and kwargs will be passed to Consumer constructor (coolamqp.attaches.consumer.Consumer). + Don't use future_to_notify - it's done here! + + Take care not to lose the Consumer object - it's the only way to cancel a consumer! + + :param queue: Queue object, being consumed from right now. + Note that name of anonymous queue might change at any time! + :param on_message: callable that will process incoming messages + if you leave it at None, messages will be .put into self.events + :type on_message: callable(ReceivedMessage instance) or None + :return: a tuple (Consumer instance, and a Future), that tells, when consumer is ready + """ + fut = Future() + on_message = on_message or self.events.put_nowait + con = Consumer(queue, on_message, future_to_notify=fut, *args, **kwargs) + self.attache_group.add(con) + return con, fut + + def publish(self, message, exchange=None, routing_key=u'', tx=False): + """ + Publish a message. + + :param message: Message to publish + :param exchange: exchange to use. Default is the "direct" empty-name exchange. + :type exchange: unicode/bytes (exchange name) or Exchange object. + :param routing_key: routing key to use + :param tx: Whether to publish it transactionally. + If you choose so, you will receive a Future that can be used + to check it broker took responsibility for this message. + :return: Future or None + """ + + publisher = (self.pub_tr if tx else self.pub_na) + + if isinstance(exchange, Exchange): + exchange = exchange.name + + try: + return publisher.publish(message, exchange.encode('utf8'), routing_key.encode('utf8')) + except Publisher.UnusablePublisher: + raise NotImplementedError(u'Sorry, this functionality if not yet implemented!') + + + def start(self, wait=True): + """ + Connect to broker. + :param wait: block until connection is ready + """ + self.listener.start() + self.snr.connect() + + #todo not really elegant + if wait: + while not self.snr.is_connected(): + time.sleep(0.1) + + def shutdown(self, wait=True): + """ + Terminate all connections, release resources - finish the job. + :param wait: block until this is done + """ + + self.snr.shutdown() + self.listener.terminate() + if wait: + self.listener.join() diff --git a/coolamqp/persistence/__init__.py b/coolamqp/clustering/single.py similarity index 68% rename from coolamqp/persistence/__init__.py rename to coolamqp/clustering/single.py index ae11e45a216c4367b3b02ad940b5917e3fb8260e..9fbb9e2683e8dd4c71ce7e977edee500c0cbd47a 100644 --- a/coolamqp/persistence/__init__.py +++ b/coolamqp/clustering/single.py @@ -1,11 +1,4 @@ # coding=UTF-8 -""" -This is the layer that makes your consumers and publishers survive connection losses. -It also renegotiates connections, shall they fail, and implements some sort of exponential delay policy. - -EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. - -""" from __future__ import print_function, absolute_import, division import six import logging @@ -17,8 +10,7 @@ logger = logging.getLogger(__name__) class SingleNodeReconnector(object): """ - This has a Listener Thread, a Node Definition, and an attache group, - and tries to keep all the things relatively alive. + Connection to one node. It will do it's best to remain alive. """ def __init__(self, node_def, attache_group, listener_thread): @@ -27,6 +19,9 @@ class SingleNodeReconnector(object): self.attache_group = attache_group self.connection = None + def is_connected(self): + return self.connection is not None + def connect(self): assert self.connection is None @@ -39,3 +34,7 @@ class SingleNodeReconnector(object): def on_fail(self): self.connection = None self.connect() + + def shutdown(self): + """Close this connection""" + self.connection.send(None) diff --git a/coolamqp/exceptions.py b/coolamqp/exceptions.py index 3d10193114dc336f4f3b93b15ba99fe25256ca32..f4ba6aa34585e02e437d65e82aa130bbdbf58908 100644 --- a/coolamqp/exceptions.py +++ b/coolamqp/exceptions.py @@ -29,8 +29,23 @@ class AMQPError(CoolAMQPError): """ Base class for errors received from AMQP server """ - def __init__(self, reply_code, reply_text, class_id, method_id): - self.reply_code = reply_code - self.reply_text = reply_text - self.class_id = class_id - self.method_id = method_id + def __init__(self, *args): + """ + + :param args: can be either reply_code, reply_text, class_id, method_id + or a ConnectionClose/ChannelClose. + """ + from coolamqp.framing.definitions import ConnectionClose, ChannelClose + + if isinstance(args[0], (ConnectionClose, ChannelClose)): + self.reply_code = args[0].reply_code + self.reply_text = args[0].reply_text + self.class_id = args[0].class_id + self.method_id = args[0].method_id + else: + assert len(args) == 4 + self.reply_code, self.reply_text, self.class_id, self.method_id = args + + +class ResourceLocked(AMQPError): + """Shorthand to catch that stuff easier""" diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 3fd9c017c36f9ab20a8071f2a63c884f0ccf69a2..dda8ea5f1033a15b731971ff1120c228b239773f 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -25,6 +25,8 @@ class Message(object): for a list of possible properties. """ + Properties = MessageProperties # an alias for easier use + def __init__(self, body, properties=None): """ Create a Message object. @@ -35,11 +37,20 @@ class Message(object): :type body: str (py2) or bytes (py3) :param properties: AMQP properties to be sent along. default is 'no properties at all' + You can pass a dict - it will be passed to MessageProperties, + but it's slow - don't do that. + :type properties: MessageProperties instance, None or a dict """ if isinstance(body, six.text_type): raise TypeError('body cannot be a text type!') self.body = six.binary_type(body) - self.properties = properties or EMPTY_PROPERTIES + + if isinstance(properties, dict): + self.properties = MessageProperties(**properties) + elif properties is None: + self.properties = EMPTY_PROPERTIES + else: + self.properties = properties LAMBDA_NONE = lambda: None @@ -113,7 +124,7 @@ class Queue(object): This object represents a Queue that applications consume from or publish to. """ - def __init__(self, name='', durable=False, exchange=None, exclusive=False, auto_delete=False): + def __init__(self, name=u'', durable=False, exchange=None, exclusive=False, auto_delete=False): """ Create a queue definition. @@ -127,7 +138,7 @@ class Queue(object): :param exclusive: Is this queue exclusive? :param auto_delete: Is this queue auto_delete ? """ - self.name = name + self.name = name.encode('utf8') # if name is '', this will be filled in with broker-generated name upon declaration self.durable = durable self.exchange = exchange diff --git a/coolamqp/uplink/listener/__init__.py b/coolamqp/uplink/listener/__init__.py index 8dd81521b1f146442f610685d1cf9759bc5e4f2c..6e09bdfd2188ed56b734fcadeee2c4f3879ab522 100644 --- a/coolamqp/uplink/listener/__init__.py +++ b/coolamqp/uplink/listener/__init__.py @@ -2,7 +2,7 @@ """ A listener is a thread that monitors a bunch of sockets for activity. -Think "asyncio core" but I couldn't be bothered to learn Twisted. +Think "asyncio" but I couldn't be bothered to learn Twisted. It provides both for sending and receiving messages. It is written as a package, because the optimal network call, epoll, is not diff --git a/tests/run.py b/tests/run.py index dc6040c4ee68c585bcabfc5acfccdf73be61625f..f7fe4c6077fe797bf56791b0a4d71251980a309b 100644 --- a/tests/run.py +++ b/tests/run.py @@ -1,48 +1,24 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -from coolamqp.uplink import ListenerThread import time, logging, threading -from coolamqp.objects import Message, MessageProperties, NodeDefinition -from coolamqp.uplink import Connection +from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue +from coolamqp.clustering import Cluster -from coolamqp.attaches import Consumer, Publisher, AttacheGroup -from coolamqp.objects import Queue -from coolamqp.persistence import SingleNodeReconnector import time NODE = NodeDefinition('127.0.0.1', 'user', 'user', heartbeat=20) -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.DEBUG) if __name__ == '__main__': - lt = ListenerThread() - lt.start() + amqp = Cluster([NODE]) + amqp.start(wait=True) - ag = AttacheGroup() - snr = SingleNodeReconnector(NODE, ag, lt) - snr.connect() - ag.add(Consumer(Queue('siema-eniu'), no_ack=False, qos=(None, 20))) - - class IPublishThread(threading.Thread): - def __init__(self, ag): - super(IPublishThread, self).__init__() - self.ag = ag - self.daemon = True - - def run(self): - pub2 = Publisher(Publisher.MODE_NOACK) - self.ag.add(pub2) - while True: - pub2.publish(Message(b'you dawg', properties=MessageProperties(content_type='text/plain')), - routing_key=b'siema-eniu') - time.sleep(0.1) - - IPublishThread(ag).start() + c1 = amqp.consume(Queue(b'siema-eniu', exclusive=True), qos=(None, 20)) + c2 = amqp.consume(Queue(b'jo-malina', exclusive=True)) while True: time.sleep(30) - - - lt.terminate() + amqp.shutdown(True)