diff --git a/.gitignore b/.gitignore index 806e67f22f419e4c8dd46f0184e63e86a55f811c..a9dc4234365eff13880f4cf72e17a8e2634d3d8f 100644 --- a/.gitignore +++ b/.gitignore @@ -6,7 +6,7 @@ __pycache__/ # C extensions *.so - +.pycharm_helpers/ # Distribution / packaging .Python env/ diff --git a/README.md b/README.md index ec4bf1b7ce1c4ed040c826bb93a244c3b90638e7..5d3475141e6802420f276a33258554c0a552aa69 100644 --- a/README.md +++ b/README.md @@ -18,3 +18,8 @@ The project is actively maintained and used in a commercial project. Tests can r either on Vagrant (Vagrantfile attached) or Travis CI, and run against RabbitMQ. Enjoy! + + +## Notes +Assertions are sprinkled throughout the code. You may wish to run with optimizations enabled +if you need every CPU cycle you can get. \ No newline at end of file diff --git a/coolamqp/attaches/__init__.py b/coolamqp/attaches/__init__.py index 7d31da13c81e736dfdf24947c8c0f1dced8a26f6..9475c8e8cf4686ec316f993dbf301602018d76f8 100644 --- a/coolamqp/attaches/__init__.py +++ b/coolamqp/attaches/__init__.py @@ -7,4 +7,5 @@ These duties almost require allocating a channel. The attache becomes then respo Attache should also register at least one on_fail watch, so it can handle things if they go south. """ -from coolamqp.attaches.consumer import Consumer \ No newline at end of file +from coolamqp.attaches.consumer import Consumer +from coolamqp.attaches.publisher import Publisher, MODE_NOACK, MODE_CNPUB diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py new file mode 100644 index 0000000000000000000000000000000000000000..1b4b4392dea89af1540ff29638ea268e3200c3ea --- /dev/null +++ b/coolamqp/attaches/channeler.py @@ -0,0 +1,177 @@ +# coding=UTF-8 +""" +Base class for consumer or publisher with the capabiility to +set up and tear down channels +""" +from __future__ import print_function, absolute_import, division +import six +from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, AMQPHeaderFrame +from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, BasicConsume, \ + BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ + QueueBind, QueueBindOk, ChannelClose, ChannelCloseOk, BasicCancel, BasicDeliver, \ + BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk +from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch + + + +ST_OFFLINE = 0 # Consumer is *not* consuming, no setup attempts are being made +ST_SYNCING = 1 # A process targeted at consuming has been started +ST_ONLINE = 2 # Consumer is declared all right + + +class Channeler(object): + """ + A base class for Consumer/Publisher implementing link set up and tear down. + + A channeler can be essentially in 4 states: + - ST_OFFLINE (.channel is None): channel is closed, object is unusable. Requires an attach() a connection + that is being established, or open, or whatever. Connection will notify + this channeler that it's open. + - ST_SYNCING: channeler is opening a channel/doing some other things related to it's setup. + it's going to be ST_ONLINE soon, or go back to ST_OFFLINE. + It has, for sure, acquired a channel number. + - ST_ONLINE: channeler is operational. It has a channel number and has declared everything + it needs to. + + on_operational(True) will be called when a transition is made TO this state. + on_operational(False) will be called when a transition is made FROM this state. + + - ST_OFFLINE (.channel is not None): channeler is undergoing a close. It has not yet torn down the channel, + but ordering it to do anything is pointless, because it will not get done + until attach() with new connection is called. + """ + + def __init__(self): + """ + [EXTEND ME!] + """ + self.state = ST_OFFLINE + self.connection = None + self.channel_id = None # channel obtained from Connection + + def attach(self, connection): + """ + Attach this object to a live Connection. + + :param connection: Connection instance to use + """ + self.connection = connection + connection.call_on_connected(self.on_uplink_established) + + # ------- event handlers + + def on_operational(self, operational): + """ + [EXTEND ME] Called by internal methods (on_*) when channel has achieved (or lost) operational status. + + If this is called with operational=True, then for sure it will be called with operational=False. + + This will, therefore, get called an even number of times. + + :param operational: True if channel has just become operational, False if it has just become useless. + """ + + def on_close(self, payload=None): + """ + [EXTEND ME] Handler for channeler destruction. + + Called on: + - channel exception + - connection failing + + This handles following situations: + - payload is None: this means that connection has gone down hard, so our Connection object is + probably very dead. Transition to ST_OFFLINE (.channel is None) + - payload is a ChannelClose: this means that a channel exception has occurred. Dispatch a ChannelCloseOk, + attempt to log an exception, transition to ST_OFFLINE (.channel is None) + - payload is a ChannelCloseOk: this means that it was us who attempted to close the channel. Return the channel + to free pool, transition to ST_OFFLINE (.channel is None) + + If you need to handle something else, extend this. Take care that this DOES NOT HANDLE errors that happen + while state is ST_SYNCING. You can expect this to handle a full channel close, therefore releasing all + resources, so it mostly will do *the right thing*. + + If you need to do something else than just close a channel, please extend or modify as necessary. + + """ + if self.state == ST_ONLINE: + # The channel has just lost operationality! + self.on_operational(False) + self.state = ST_OFFLINE + + if payload is None: + # Connection went down HARD + self.connection.free_channels.put(self.channel_id) + self.channel_id = None + elif isinstance(payload, ChannelClose): + # We have failed + print('Channel close: RC=%s RT=%s', payload.reply_code, payload.reply_text) + self.connection.free_channels.put(self.channel_id) + self.channel_id = None + + elif isinstance(payload, ChannelCloseOk): + self.connection.free_channels.put(self.channel_id) + self.channel_id = None + else: + raise Exception('Unrecognized payload - did you forget to handle something? :D') + + def methods(self, payloads): + """ + Syntactic sugar for + + for payload in paylods: + self.method(payload) + + But moar performant. + """ + assert self.channel_id is not None + frames = [AMQPMethodFrame(self.channel_id, payload) for payload in payloads] + self.connection.send(frames) + + def method(self, payload): + """ + Syntactic sugar for: + + self.connection.send([AMQPMethodFrame(self.channel_id, payload)]) + """ + self.methods([payload]) + + def method_and_watch(self, method_payload, method_classes_to_watch, callable): + """ + Syntactic sugar for + + self.connection.method_and_watch(self.channel_id, + method_payload, + method_classes_to_watch, + callable) + """ + assert self.channel_id is not None + self.connection.method_and_watch(self.channel_id, method_payload, method_classes_to_watch, callable) + + def on_setup(self, payload): + """ + [OVERRIDE ME!] Called with a method frame that signifies a part of setup. + + You must be prepared to handle at least a payload of ChannelOpenOk + + :param payload: AMQP method frame payload + """ + raise Exception('Abstract method - override me!') + + + def on_uplink_established(self): + """Called by connection. Connection reports being ready to do things.""" + self.state = ST_SYNCING + self.channel_id = self.connection.free_channels.pop() + + self.connection.watch_for_method(self.channel_id, (ChannelClose, ChannelCloseOk, BasicCancel), + self.on_close, + on_fail=self.on_close) + + self.connection.method_and_watch( + self.channel_id, + ChannelOpen(), + ChannelOpenOk, + self.on_setup + ) + diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 7f628cd2fd100eeb88bfaf275d24f60b735fd6d3..43769cf5215f9f8a2c29aaec925c7b9bc66e9500 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -1,26 +1,17 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function import six -from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, AMQPHeaderFrame -from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, BasicConsume, \ +from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame +from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ - QueueBind, QueueBindOk, ChannelClose, ChannelCloseOk, BasicCancel, BasicDeliver, \ + QueueBind, QueueBindOk, ChannelClose, BasicCancel, BasicDeliver, \ BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch +from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE -ST_OFFLINE = 0 # Consumer is *not* consuming, no setup attempts are being made -ST_SYNCING = 1 # A process targeted at consuming has been started -ST_ONLINE = 2 # Consumer is declared all right - -EV_ONLINE = 7 # called upon consumer being online and consuming -EV_CANCEL = 8 # consumer has been cancelled by BasicCancel. - # EV_OFFLINE will follow immediately -EV_OFFLINE = 9 # channel down -EV_MESSAGE = 10 # received a message - -class Consumer(object): +class Consumer(Channeler): """ This object represents a consumer in the system. @@ -37,14 +28,6 @@ class Consumer(object): Since this implies cancelling the consumer, here you go. """ - def attach(self, connection): - """ - Attach this consumer to a connection - :param connection: coolamqp.framing.Connection - """ - self.connection = connection - connection.call_on_connected(self.on_uplink_established) - def __init__(self, queue, no_ack=True, qos=None, dont_pause=False): """ To be instantiated only by Cluster @@ -55,50 +38,26 @@ class Consumer(object): :param no_ack: Will this consumer require acknowledges from messages? :param dont_pause: Consumer will fail on the spot instead of pausing """ - self.state = ST_OFFLINE + super(Consumer, self).__init__() + self.queue = queue self.no_ack = no_ack # private - self.connection = None # broker on which was last seen - self.channel_id = None self.cancelled = False # did the client want to STOP using this consumer? self.receiver = None # MessageReceiver instance - def on_event(self, event, arg=None): - """ - Called upon events arriving. Possible events are: - arg={EV_ONLINE} called directly after setup. self.state is not yet set! - args={EV_CANCEL} seen a RabbitMQ consumer cancel notify. EV_OFFLINE follows - args={EV_OFFLINE} sent when a channel is no longer usable. It may not yet have been torn down. - this will be called only if EV_ONLINE was previously dispatched - arg={EV_MESSAGE} - """ - assert event in (EV_OFFLINE, EV_CANCEL, EV_MESSAGE, EV_ONLINE) + def on_operational(self, operational): + super(Consumer, self).on_operational(operational) - if event == EV_OFFLINE and (self.state is not ST_ONLINE): - return # No point in processing that - - if event == EV_ONLINE: - print('Entering ST_ONLINE') - self.state = ST_ONLINE + if operational: assert self.receiver is None self.receiver = MessageReceiver(self) - - elif event == EV_OFFLINE: + else: self.receiver.on_gone() self.receiver = None - def __stateto(self, st): - """if st is not current state, statify it. - As an extra if it's a transition to ST_OFFLINE, send an event""" - if (self.state == ST_ONLINE) and (st == ST_OFFLINE): - # the only moment when EV_OFFLINE is generated - self.on_event(EV_OFFLINE) - - self.state = st - def on_close(self, payload=None): """ Handle closing the channel. It sounds like an exception... @@ -108,44 +67,30 @@ class Consumer(object): be there 2. self.channel_id <- None, channel is returned to Connection - channel has been physically torn down """ - should_retry = False - release_channel = False + if self.state == ST_ONLINE: + # The channel has just lost operationality! + self.on_operational(False) + self.state = ST_OFFLINE - print('HAYWIRE ', payload) + should_retry = False if isinstance(payload, BasicCancel): # Consumer Cancel Notification - by RabbitMQ - self.on_event(EV_CANCEL) - self.__stateto(ST_OFFLINE) - self.connection.send([AMQPMethodFrame(self.channel_id, BasicCancelOk()), - AMQPMethodFrame(self.channel_id, ChannelClose(0, b'Received basic.cancel', 0, 0))]) - return - - if isinstance(payload, BasicCancelOk): - self.__stateto(ST_OFFLINE) - # proceed with teardown - self.connection.send([AMQPMethodFrame(self.channel_id, ChannelClose(0, b'Received basic.cancel-ok', 0, 0))]) - return - - # at this point this can be only ChannelClose, ChannelCloseOk or on_fail - # release the kraken - - if isinstance(payload, ChannelClose): - # it sounds like an exception - self.__stateto(ST_OFFLINE) - print(payload.reply_code, payload.reply_text) - self.connection.send([AMQPMethodFrame(self.channel_id, ChannelCloseOk())]) - should_retry = payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED) - - if self.channel_id is not None: - self.__stateto(ST_OFFLINE) - self.connection.unwatch_all(self.channel_id) - self.connection.free_channels.append(self.channel_id) - self.channel_id = None - self.remaining_for_ack = set() + self.methods([BasicCancelOk(), ChannelClose(0, b'Received basic.cancel', 0, 0)]) + + elif isinstance(payload, BasicCancelOk): + # OK, our cancelling went just fine - proceed with teardown + self.method(ChannelClose(0, b'Received basic.cancel-ok', 0, 0)) + + elif isinstance(payload, ChannelClose): + if payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED): + should_retry = True + super(Consumer, self).on_close(payload) + else: + super(Consumer, self).on_close(payload) if should_retry: - self.on_uplink_established() # retry + self.attach(self.connection) def on_delivery(self, sth): """ @@ -164,12 +109,6 @@ class Consumer(object): def on_setup(self, payload): """Called with different kinds of frames - during setup""" - if self.cancelled: - # We were declaring this, but during this situation this - # consumer was cancelled. Close the channel and things. - self.connection.send(self.channel_id, ChannelClose(0, 'Consumer cancelled', 0, 0)) - return - if isinstance(payload, ChannelOpenOk): # Do we need to declare the exchange? @@ -217,15 +156,10 @@ class Consumer(object): # We need any form of binding. if self.queue.exchange is not None: - self.connection.method_and_watch( - self.channel_id, + self.method_and_watch( QueueBind( - self.queue.name.encode('utf8'), - self.queue.exchange.name.encode('utf8'), - b'', - False, - [] - ), + self.queue.name.encode('utf8'), self.queue.exchange.name.encode('utf8'), + b'', False, []), QueueBindOk, self.on_setup ) @@ -234,24 +168,15 @@ class Consumer(object): self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): # itadakimasu - self.connection.method_and_watch( - self.channel_id, - BasicConsume( - self.queue.name.encode('utf8'), - self.queue.name.encode('utf8'), - False, - self.no_ack, - self.queue.exclusive, - False, - [] - ), + self.method_and_watch( + BasicConsume(self.queue.name.encode('utf8'), self.queue.name.encode('utf8'), + False, self.no_ack, self.queue.exclusive, False, []), BasicConsumeOk, self.on_setup ) elif isinstance(payload, BasicConsumeOk): - # AWWW RIGHT~!!! - self.state = ST_ONLINE + # AWWW RIGHT~!!! We're good. # Register watches for receiving shit self.connection.watch(HeaderOrBodyWatch(self.channel_id, self.on_delivery)) @@ -259,27 +184,10 @@ class Consumer(object): mw.oneshot = False self.connection.watch(mw) - self.on_event(EV_ONLINE) - - def on_uplink_established(self): - """Consumer was created or uplink was regained. Try to declare it""" - if self.cancelled: - return # it's OK. - - self.state = ST_SYNCING - self.channel_id = self.connection.free_channels.pop() + self.state = ST_ONLINE + self.on_operational(True) - self.connection.watch_for_method(self.channel_id, - (ChannelClose, ChannelCloseOk, BasicCancel), - self.on_close, - on_fail=self.on_close) - self.connection.method_and_watch( - self.channel_id, - ChannelOpen(), - ChannelOpenOk, - self.on_setup - ) @@ -335,11 +243,9 @@ class MessageReceiver(object): return # already confirmed/rejected if success: - self.consumer.connection.send([AMQPMethodFrame(self.consumer.channel_id, - BasicAck(delivery_tag, False))]) + self.consumer.method(BasicAck(delivery_tag, False)) else: - self.consumer.connection.send([AMQPMethodFrame(self.consumer.channel_id, - BasicReject(delivery_tag, True))]) + self.consumer.method(BasicReject(delivery_tag, True)) return callable diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py new file mode 100644 index 0000000000000000000000000000000000000000..6dc75e99c28e6e529ae585d60f0da170c7fc61a8 --- /dev/null +++ b/coolamqp/attaches/publisher.py @@ -0,0 +1,146 @@ +# coding=utf-8 +""" +Module used to publish messages. + +Expect wild NameErrors if you build this without RabbitMQ extensions (enabled by default), +and try to use MODE_CNPUB. + +If you use a broker that doesn't support these, just don't use MODE_CNPUB. CoolAMQP is smart enough +to check with the broker beforehand. +""" +from __future__ import absolute_import, division, print_function +import six +import warnings +import logging +import collections +from coolamqp.framing.definitions import ChannelOpenOk + +try: + # these extensions will be available + from coolamqp.framing.definitions import ConfirmSelect, ConfirmSelectOk +except ImportError: + pass + +from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE +from coolamqp.uplink import PUBLISHER_CONFIRMS + + +logger = logging.getLogger(__name__) + +MODE_NOACK = 0 +MODE_CNPUB = 1 # this will be available only if suitable extensions were used + + +class Publisher(Channeler): + """ + An object that is capable of sucking into a Connection and sending messages. + Depending on it's characteristic, it may process messages in: + + - non-ack mode (default) - messages will be dropped on the floor if there is no active uplink + - Consumer Publish mode - requires broker support, each message will be ACK/NACKed by the broker + messages will survive broker reconnections. + + If you support this, it is your job to ensure that broker supports + publisher_confirms. If it doesn't, this publisher will enter ST_OFFLINE + and emit a warning. + + Other modes may be added in the future. + """ + + def __init__(self, mode): + """ + Create a new publisher + :param mode: Publishing mode to use. One of: + MODE_NOACK - use non-ack mode + MODE_CNPUB - use consumer publishing mode. TypeError will be raised when this publisher + if attached to a consumer that doesn't have consumer publishes negotiated + :type mode: MODE_NOACK or MODE_CNPUB + :raise ValueError: mode invalid + """ + super(Publisher, self).__init__() + if mode not in (MODE_NOACK, MODE_CNPUB): + raise ValueError(u'Invalid publisher mode') + + self.mode = mode + + self.messages = collections.deque() # Messages to publish. From newest to last. + # tuple of (Message object, exchange name::str, routing_key::str, + # Future to confirm or None, flags as tuple|empty tuple + + self.delivery_tag = 0 # next delivery tag + + + def publish(self, message, exchange_name=b'', routing_key=b''): + """ + Schedule to have a message published. + + :param message: Message object to send + :param exchange_name: exchange name to use. Default direct exchange by default + :param routing_key: routing key to use + :return: a Future object symbolizing delivering the message to AMQP (or any else guarantees publisher mode + will make). + This is None when mode is noack + """ + # Formulate the request + if self.mode == MODE_NOACK: + + # If we are not connected right now, drop the message on the floor and log it with DEBUG + if self.state != ST_ONLINE: + logger.debug(u'Publish request, but not connected - dropping the message') + else: + # Dispatch! + pass + + + + self.messages.append(( + message, + exchange_name, + routing_key, + None, + () + )) + else: + fut = u'banana banana banana' + self.messages.append(( + message, + exchange_name, + routing_key, + fut + )) + return fut + + # Attempt dispatching messages as possible + if self.mode == MODE_NOACK: + pass + + def on_setup(self, payload): + + # Assert that mode is OK + if self.mode == MODE_CNPUB: + if PUBLISHER_CONFIRMS not in self.connection.extensions: + warnings.warn(u'Broker does not support publisher_confirms, refusing to start publisher', + RuntimeWarning) + self.state = ST_OFFLINE + return + + if isinstance(payload, ChannelOpenOk): + # Ok, if this has a mode different from MODE_NOACK, we need to additionally set up + # the functionality. + + if self.mode == MODE_CNPUB: + self.method_and_watch(ConfirmSelect(False), ConfirmSelectOk, self.on_setup) + elif self.mode == MODE_NOACK: + # A-OK! Boot it. + self.state = ST_ONLINE + self.on_operational(True) + + elif self.mode == MODE_CNPUB: + # Because only in this case it makes sense to check for MODE_CNPUB + if isinstance(payload, ConfirmSelectOk): + # A-OK! Boot it. + self.state = ST_ONLINE + self.on_operational(True) + + + diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..25dd9c6a8bb9d7fe5701a13df624c5bc798b27d5 --- /dev/null +++ b/coolamqp/attaches/utils.py @@ -0,0 +1,209 @@ +# coding=UTF-8 +from __future__ import print_function, absolute_import, division +import six +import logging +import threading + +logger = logging.getLogger(__name__) + + +class ConfirmableRejectable(object): + """ + Protocol for objects put into AtomicTagger. You need not subclass it, + just support this protocol. + """ + + def confirm(self): + """ + This has been ACK'd + :return: don't care + """ + + def reject(self): + """ + This has been REJECT'd/NACK'd + :return: don't care + """ + +class ManualConfirmableRejectable(ConfirmableRejectable): + """ + A callback-based way to create ConfirmableRejectable objects + """ + def __init__(self, on_ack, on_nack): + """ + :param on_ack: callable/0, will be called on .confirm + :param on_nack: callable/0, will be called on .reject + """ + self.on_ack = on_ack + self.on_nack = on_nack + + def confirm(self): + self.on_ack() + + def reject(self): + self.on_nack() + + +class AtomicTagger(object): + """ + This implements a thread-safe dictionary of (integer=>ConfirmableRejectable | None), + used for processing delivery tags / (negative) acknowledgements. + - you can requisition a key. This key belongs only to you, and the whole world + doesn't know you have it. + + delivery_tag_to_use = tagger.get_key() + + - you can deposit a ConfirmableRejectable into the tagger. + + tagger.deposit(delivery_tag, message) + + After you do so, this tag is subject to be acked/nacked. Read on. + + - you can (multiple)(ack/nack) messages. This coresponds to multiple bit + used in basic.ack/basic.nack. + + If this is done, your message objects (that MUST implement the + ConfirmableRejectable protocol) will have respective methods called. + These methods MUST NOT depend on particular state of locking by this + object. + + Thread safety is implemented using reentrant locking. The lock object is a + threading.RLock, and you can access it at atomicTagger.lock. + + Please note that delivery tags are increasing non-negative integer. + Therefore, X>Y implies that sending/receiving X happened after Y. + + Note that key/delivery_tag of 0 has special meaning of "everything so far". + + This has to be fast for most common cases. Corner cases will be resolved correctly, + but maybe not fast. + """ + + def __init__(self): + self.lock = threading.RLock() + + # Protected by lock + self.next_tag = 1 # 0 is AMQP-reserved to mean "everything so far" + self.tags = [] # a list of (tag, ConfirmableRejectable) + # they remain to be acked/nacked + # invariant: FOR EACH i, j: (i>j) => (tags[i][0] > tags[j][0]) + + def deposit(self, tag, obj): + """ + Put a tag into the tag list. + + Putting the same tag more than one time will result in undefined behaviour. + + :param tag: non-negative integer + :param obj: ConfirmableRejectable + if you put something that isn't a ConfirmableRejectable, you won't get bitten + until you call .ack() or .nack(). + """ + assert tag >= 0 + opt = (tag, obj) + + with self.lock: + if len(self.tags) == 0: + self.tags.append(opt) + elif self.tags[-1][0] < tag: + self.tags.append(opt) + else: + # Insert a value at place where it makes sense. Iterate from the end, because + # values will usually land there... + i = len(self.tags) - 1 # start index + + while i>0: # this will terminate at i=0 + if self.tags[i][0] > tag: # this means we should insert it here... + break + i -= 1 # previousl index + + self.tags.insert(i, opt) + + def __acknack(self, tag, multiple, ack): + """ + :param tag: Note that 0 means "everything" + :param ack: True to ack, False to nack + """ + # Compute limits - they go from 0 to somewhere + with self.lock: + start = 0 + # start and stop will signify the PYTHON SLICE parameters + + if tag > 0: + + if multiple: + # Compute the ranges + for stop, opt in enumerate(self.tags): + if opt[0] == tag: + stop += 1 # this is exactly this tag. Adjust stop to end one further (Python slicing) and stop + break + if opt[0] > tag: + break # We went too far, but it's OK, we don't need to bother with adjusting stop + else: + # List finished without breaking? That would mean the entire range! + stop = len(self.tags) + else: + # Just find that piece + for index, opt in enumerate(self.tags): + if opt[0] == tag: + stop = index + 1 + break + else: + return # not found! + + + if not multiple: + start = stop-1 + else: + # Oh, I know the range! + stop = len(self.tags) + + print('Range computed of %s:%s' % (start, stop)) + + items = self.tags[start:stop] + del self.tags[start:stop] + + for tag, cr in items: + if ack: + cr.confirm() + else: + cr.reject() + + def ack(self, tag, multiple): + """ + Acknowledge given objects. + + If multiple, objects UP TO AND INCLUDING tag will have .confirm() called. + If it's false, only this precise objects will have done so. + It this object does not exist, nothing will happen. Acking same tag more than one time + is a no-op. + + Things acked/nacked will be evicted from .data + :param tag: delivery tag to use. Note that 0 means "everything so far" + """ + self.__acknack(tag, multiple, True) + + def nack(self, tag, multiple): + """ + Acknowledge given objects. + + If multiple, objects UP TO AND INCLUDING tag will have .confirm() called. + If it's false, only this precise objects will have done so. + It this object does not exist, nothing will happen. Acking same tag more than one time + is a no-op. + + Things acked/nacked will be evicted from .data + :param tag: delivery tag to use. Note that 0 means "everything so far" + """ + self.__acknack(tag, multiple, False) + + def get_key(self): + """ + Return a key. It won't be seen here until you deposit it. + + It's just yours, and you can do whatever you want with it, even drop on the floor. + :return: a positive integer + """ + with self.lock: + self.next_tag += 1 + return self.next_tag - 1 diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py index 77106aaea6de30fafc10248afa1fc5d5ec8e89d0..b7d705d70cb3bab249f88d6926223aae4420f817 100644 --- a/coolamqp/backends/__init__.py +++ b/coolamqp/backends/__init__.py @@ -1,3 +1,6 @@ # coding=UTF-8 -from coolamqp.backends.pyamqp import PyAMQPBackend +from coolamqp.backends.coolamqp import CoolAMQPBackend from coolamqp.backends.base import AMQPError, ConnectionFailedError, RemoteAMQPError, Cancelled +""" +Backend is a legacy way to access CoolAMQP. +""" \ No newline at end of file diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index ad1b34ff77fff1029b704697ae0012218c7c2b2e..47e4af6b103b3a9bf2275866c050f0e306958aab 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -16,10 +16,6 @@ class ConnectionFailedError(AMQPError): return u'ConnectionFailedError("%s")' % map(repr, (self.reply_text, )) -class Discarded(Exception): - """send() for this message had discard_on_retry""" - - class Cancelled(Exception): """Cancel ordered by user""" diff --git a/coolamqp/backends/coolamqp.py b/coolamqp/backends/coolamqp.py new file mode 100644 index 0000000000000000000000000000000000000000..8ca02034f6401bbc106f2a0d3a2ae7a3dcf81dcd --- /dev/null +++ b/coolamqp/backends/coolamqp.py @@ -0,0 +1,114 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + + +from coolamqp.backends.base import AMQPBackend + + +# Create a global ListenerThread +from coolamqp.uplink import ListenerThread +GLOBALS = { + 'listener': ListenerThread() +} + + +class CoolAMQPBackend(AMQPBackend): + """ + A backend utilizing CoolAMQP's coolamqp.attaches and coolamqp.connection. + Backend starts with creating a connection, and ends with blanging it. + """ + def __init__(self, cluster_node, cluster_handler_thread): + """ + Connects to an AMQP backend. + """ + self.cluster_handler_thread = cluster_handler_thread + + def process(self, max_time=10): + """ + Do bookkeeping, process messages, etc. + :param max_time: maximum time in seconds this call can take + :raises ConnectionFailedError: if connection failed in the meantime + """ + + def exchange_declare(self, exchange): + """ + Declare an exchange + :param exchange: Exchange object + """ + + def exchange_delete(self, exchange): + """ + Delete an exchange + :param exchange: Exchange object + """ + + def queue_bind(self, queue, exchange, routing_key=''): + """ + Bind a queue to an exchange + :param queue: Queue object + :param exchange: Exchange object + :param routing_key: routing key to use + """ + + def queue_delete(self, queue): + """ + Delete a queue. + + :param queue: Queue + """ + + + def queue_declare(self, queue): + """ + Declare a queue. + + This will change queue's name if anonymous + :param queue: Queue + """ + + def basic_cancel(self, consumer_tag): + """ + Cancel consuming, identified by a consumer_tag + :param consumer_tag: consumer_tag to cancel + """ + + def basic_consume(self, queue, no_ack=False): + """ + Start consuming from a queue + :param queue: Queue object + :param no_ack: Messages will not need to be ack()ed for this queue + """ + + def basic_ack(self, delivery_tag): + """ + ACK a message. + :param delivery_tag: delivery tag to ack + """ + + def basic_qos(self, prefetch_size, prefetch_count, global_): + """ + Issue a basic.qos(prefetch_size, prefetch_count, True) against broker + :param prefetch_size: prefetch window size in octets + :param prefetch_count: prefetch window in terms of whole messages + """ + + def basic_reject(self, delivery_tag): + """ + Reject a message + :param delivery_tag: delivery tag to reject + """ + + def basic_publish(self, message, exchange, routing_key): + """ + Send a message + :param message: Message object to send + :param exchange: Exchange object to publish to + :param routing_key: routing key to use + """ + + def shutdown(self): + """ + Close this connection. + This is not allowed to return anything or raise + """ + self.cluster_handler_thread = None # break GC cycles diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py deleted file mode 100644 index 21bbb37e85f831ee7bfebb916245c30bec883ab8..0000000000000000000000000000000000000000 --- a/coolamqp/cluster.py +++ /dev/null @@ -1,240 +0,0 @@ -# coding=UTF-8 -import itertools -from six.moves import queue as Queue -from coolamqp.backends import PyAMQPBackend -from coolamqp.backends.base import Discarded -from coolamqp.orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ - DeleteExchange, SetQoS, DeclareQueue, Order -from coolamqp.messages import Exchange - - -class ClusterNode(object): - """ - Definition of a reachable AMQP node. - - This object is hashable. - """ - - def __init__(self, *args, **kwargs): - """ - Create a cluster node definition. - - a = ClusterNode(host='192.168.0.1', user='admin', password='password', - virtual_host='vhost') - - or - - a = ClusterNode('192.168.0.1', 'admin', 'password') - - Additional keyword parameters that can be specified: - heartbeat - heartbeat interval in seconds - """ - - self.heartbeat = kwargs.pop('heartbeat', None) - - if len(kwargs) > 0: - # Prepare arguments for amqp.connection.Connection - self.host = kwargs['host'] - self.user = kwargs['user'] - self.password = kwargs['password'] - self.virtual_host = kwargs.get('virtual_host', '/') - elif len(args) == 3: - self.host, self.user, self.password = args - self.virtual_host = '/' - elif len(args) == 4: - self.host, self.user, self.password, self.virtual_host = args - else: - raise NotImplementedError #todo implement this - - def __str__(self): - return '%s@%s/%s' % (self.host, - self.user, - self.virtual_host) - - -class Cluster(object): - """ - Represents connection to an AMQP cluster. This internally connects only to one node, but - will select another one upon connection failing. - - You can pass callbacks to most commands. They will also return an Order instance, - that you can wait for to know an operation has completed. - - Callbacks are executed before Order is marked as complete (it's .result() returns), so if you do: - - cluster.send(.., on_completed=hello).result() - bye() - - hello will be called before bye is called. - """ - - def __init__(self, nodes, backend=PyAMQPBackend): - """ - Construct the cluster definition - :param nodes: iterable of nodes to try connecting, in this order. - if list if exhaused, it will be started from beginning - :param backend: backend to use - """ - - self.backend = backend - self.node_to_connect_to = itertools.cycle(nodes) - - self.connected = False #: public, is connected to broker? - - from .handler import ClusterHandlerThread - self.thread = ClusterHandlerThread(self) - - def send(self, message, exchange=None, routing_key='', discard_on_fail=False, on_completed=None, on_failed=None): - """ - Schedule a message to be sent. - :param message: Message object to send. - :param exchange: Exchange to use. Leave None to use the default exchange - :param routing_key: routing key to use - :param discard_on_fail: if True, then message is valid for sending ONLY with current connection. - Will be discarded upon fail. - :param on_completed: callable/0 to call when this succeeds - :param on_failed: callable/1 to call when this fails with AMQPError instance - or Cancelled instance if user cancelled this order - or Discarded instance if message discarded due to 'discard_on_fail' - :return: a Future with this order's status - """ - a = SendMessage(message, exchange or Exchange.direct, routing_key, - discard_on_fail=discard_on_fail, - on_completed=on_completed, on_failed=on_failed) - - if discard_on_fail and self.thread.backend is None: - o = Order() - o.discarded = True - on_failed(Discarded()) - return o - # discard at once if no point in sending - - self.thread.order_queue.append(a) - return a - - def declare_exchange(self, exchange, on_completed=None, on_failed=None): - """ - Declare an exchange. It will be re-declared upon reconnection. - - :param exchange: Exchange to declare - :param on_completed: callable/0 to call when this succeeds - :param on_failed: callable/1 to call when this fails with AMQPError instance - :return: a Future with this order's status - """ - a = DeclareExchange(exchange, on_completed=on_completed, on_failed=on_failed) - self.thread.order_queue.append(a) - return a - - def declare_queue(self, queue, on_completed=None, on_failed=None): - """ - Declares a queue. - - !!!! If you declare a queue and NOT consume from it, it will not be re-declared - upon reconnection !!!! - - :param queue: Queue to declare - :param on_completed: callable/0 to call when this succeeds - :param on_failed: callable/1 to call when this fails with AMQPError instance - :return: a Future with this order's status - """ - a = DeclareQueue(queue, on_completed=on_completed, on_failed=on_failed) - self.thread.order_queue.append(a) - return a - - def delete_exchange(self, exchange, on_completed=None, on_failed=None): - """ - Delete an exchange - :param exchange: Exchange to delete - :param on_completed: callable/0 to call when this succeeds - :param on_failed: callable/1 to call when this fails with AMQPError instance - :return: a Future with this order's status - """ - a = DeleteExchange(exchange, on_completed=on_completed, on_failed=on_failed) - self.thread.order_queue.append(a) - return a - - def delete_queue(self, queue, on_completed=None, on_failed=None): - """ - Delete a queue - :param queue: Queue to delete - :param on_completed: callable/0 to call when this succeeds - :param on_failed: callable/1 to call when this fails with AMQPError instance - :return: a Future with this order's status - """ - a = DeleteQueue(queue, on_completed=on_completed, on_failed=on_failed) - self.thread.order_queue.append(a) - return a - - def cancel(self, queue, on_completed=None, on_failed=None): - """ - Cancel consuming from a queue - - :param queue: Queue to consume from - :param on_completed: callable/0 to call when this succeeds - :param on_failed: callable/1 to call when this fails with AMQPError instance - :return: a Future with this order's status - """ - a = CancelQueue(queue, on_completed=on_completed, on_failed=on_failed) - self.thread.order_queue.append(a) - return a - - def qos(self, prefetch_window, prefetch_count, global_=True): - a = SetQoS(prefetch_window, prefetch_count, global_) - self.thread.order_queue.append(a) - return a - - def consume(self, queue, no_ack=False, on_completed=None, on_failed=None): - """ - Start consuming from a queue - - This queue will be declared to the broker. If this queue has any binds - (.exchange field is not empty), queue will be binded to exchanges. - - :param queue: Queue to consume from - :param on_completed: callable/0 to call when this succeeds - :param no_ack: if True, you will not need to call .ack() for this queue - :param on_failed: callable/1 to call when this fails with AMQPError instance - :return: a Future with this order's status - """ - a = ConsumeQueue(queue, no_ack=no_ack, on_completed=on_completed, on_failed=on_failed) - self.thread.order_queue.append(a) - return a - - def drain(self, wait=0): - """ - Return a ClusterEvent on what happened, or None if nothing could be obtained - within given time - :param wait: Interval to wait for events. - Finite number to wait this much seconds before returning None - None to wait for infinity - 0 to return immediately - :return: a ClusterEvent instance or None - """ - try: - if wait == 0: - return self.thread.event_queue.get(False) - else: - return self.thread.event_queue.get(True, wait) - except Queue.Empty: - return None - - def start(self): - """ - Connect to the cluster. - :return: self - """ - self.thread.start() - return self - - def shutdown(self, complete_remaining_tasks=False): - """ - Cleans everything and returns. - - :param complete_remaining_tasks_tasks: if set to True, pending operations will be completed. - If False, thread will exit without completing them. - This can mean that if the cluster doesn't come up online, shutdown MAY BLOCK FOREVER. - """ - self.thread.complete_remaining_upon_termination = complete_remaining_tasks - self.thread.terminate() - self.thread.join() - # thread closes the AMQP uplink for us diff --git a/coolamqp/connection/definition.py b/coolamqp/connection/definition.py index fe4695f2e5a49736638328cf057adf8871662407..f33cc9afaeb6e51cb024e930458eee51fd7088db 100644 --- a/coolamqp/connection/definition.py +++ b/coolamqp/connection/definition.py @@ -1,28 +1,47 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function - +import six class NodeDefinition(object): """ - Definition of a node + Definition of a reachable AMQP node. + + This object is hashable. """ - def __init__(self, host, port, user, password, virtual_host='/', heartbeat=None): + def __init__(self, *args, **kwargs): """ - All necessary information to establish a link to a broker. - - :param host: TCP host, str - :param port: TCP port, int - :param user: AMQP user - :param password: AMQP password - :param virtual_host: AMQP virtual host - :param amqp_version: AMQP protocol version + Create a cluster node definition. + + a = ClusterNode(host='192.168.0.1', user='admin', password='password', + virtual_host='vhost') + + or + + a = ClusterNode('192.168.0.1', 'admin', 'password') + + Additional keyword parameters that can be specified: + heartbeat - heartbeat interval in seconds + port - TCP port to use. Default is 5672 """ - self.user = user - self.password = password - self.host = host - self.port = port - self.virtual_host = virtual_host - self.heartbeat = heartbeat + self.heartbeat = kwargs.pop('heartbeat', None) + self.port = kwargs.pop('port', 5672) + + if len(kwargs) > 0: + # Prepare arguments for amqp.connection.Connection + self.host = kwargs['host'] + self.user = kwargs['user'] + self.password = kwargs['password'] + self.virtual_host = kwargs.get('virtual_host', '/') + elif len(args) == 3: + self.host, self.user, self.password = args + self.virtual_host = '/' + elif len(args) == 4: + self.host, self.user, self.password, self.virtual_host = args + else: + raise NotImplementedError #todo implement this + + def __str__(self): + return six.text_type(b'amqp://%s:%s@%s/%s'.encode('utf8') % (self.host, self.port, self.user, self.virtual_host)) diff --git a/coolamqp/handler.py b/coolamqp/handler.py deleted file mode 100644 index 1c32fde861bd7ba11eb1fc819d5fdbceb40189d9..0000000000000000000000000000000000000000 --- a/coolamqp/handler.py +++ /dev/null @@ -1,294 +0,0 @@ -# coding=UTF-8 -import threading -from six.moves import queue -import six -import logging -import collections -import time -from .backends import ConnectionFailedError, RemoteAMQPError, Cancelled -from .messages import Exchange -from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived -from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ - AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \ - DeleteExchange, SetQoS, DeclareQueue - -logger = logging.getLogger(__name__) - - -class _ImOuttaHere(Exception): - """Thrown upon thread terminating. - Thrown only if complete_remaining_upon_termination is False""" - - -class ClusterHandlerThread(threading.Thread): - """ - Thread that does bookkeeping for a Cluster. - """ - def __init__(self, cluster): - """ - :param cluster: coolamqp.Cluster - """ - threading.Thread.__init__(self) - - self.cluster = cluster - self.daemon = True # if you don't explicitly wait for me, that means you don't need to - self.is_terminating = False - self.complete_remaining_upon_termination = False - self.order_queue = collections.deque() # queue for inbound orders - self.event_queue = queue.Queue() # queue for tasks done - self.connect_id = -1 # connectID of current connection - - self.declared_exchanges = {} # declared exchanges, by their names - self.queues_by_consumer_tags = {} # tuple of (subbed queue, no_ack::bool), by consumer tags - - self.backend = None - self.first_connect = True - - self.qos = None # or tuple (prefetch_size, prefetch_count) if QoS set - - def _reconnect_attempt(self): - """Single attempt to regain connectivity. May raise ConnectionFailedError""" - self.backend = None - if self.backend is not None: - self.backend.shutdown() - self.backend = None - - self.connect_id += 1 - node = six.next(self.cluster.node_to_connect_to) - logger.info('Connecting to %s', node) - - self.backend = self.cluster.backend(node, self) - - if self.qos is not None: - pre_siz, pre_cou, glob = self.qos - self.backend.basic_qos(pre_siz, pre_cou, glob) - - for exchange in self.declared_exchanges.values(): - self.backend.exchange_declare(exchange) - - failed_queues = [] - for queue, no_ack in self.queues_by_consumer_tags.values(): - while True: - try: - self.backend.queue_declare(queue) - if queue.exchange is not None: - self.backend.queue_bind(queue, queue.exchange) - self.backend.basic_consume(queue, no_ack=no_ack) - logger.info('Consuming from %s no_ack=%s', queue, no_ack) - except RemoteAMQPError as e: - if e.code in (403, 405): # access refused, resource locked - # Ok, queue, what should we do? - if queue.locked_after_reconnect == 'retry': - time.sleep(0.1) - continue # retry until works - elif queue.locked_after_reconnect == 'cancel': - self.event_queue.put(ConsumerCancelled(queue, ConsumerCancelled.REFUSED_ON_RECONNECT)) - failed_queues.append(queue) - elif queue.locked_after_reconnect == 'defer': - self.order_queue.append(ConsumeQueue(queue, no_ack=no_ack)) - failed_queues.append(queue) - else: - raise Exception('wtf') - else: - raise # idk - break - - for failed_queue in failed_queues: - del self.queues_by_consumer_tags[failed_queue.consumer_tag] - - def _reconnect(self): - """Regain connectivity to cluster. May block for a very long time, - as it will not """ - exponential_backoff_delay = 1 - - while not self.cluster.connected: - try: - self._reconnect_attempt() - except ConnectionFailedError as e: - # a connection failure happened :( - logger.warning('Connecting failed due to %s while connecting and initial setup', repr(e)) - self.cluster.connected = False - if self.backend is not None: - self.backend.shutdown() - self.backend = None # good policy to release resources before you sleep - time.sleep(exponential_backoff_delay) - - if self.is_terminating and (not self.complete_remaining_upon_termination): - raise _ImOuttaHere() - - exponential_backoff_delay = min(60, exponential_backoff_delay * 2) - else: - logger.info('Connected to AMQP broker via %s', self.backend) - self.cluster.connected = True - self.event_queue.put(ConnectionUp(initial=self.first_connect)) - self.first_connect = False - - - def perform_order(self): - order = self.order_queue.popleft() - - try: - if order.cancelled: - logger.debug('Order %s was cancelled', order) - order._failed(Cancelled()) - return - - if isinstance(order, SendMessage): - self.backend.basic_publish(order.message, order.exchange, order.routing_key) - elif isinstance(order, SetQoS): - self.qos = order.qos - pre_siz, pre_cou, glob = order.qos - self.backend.basic_qos(pre_siz, pre_cou, glob) - elif isinstance(order, DeclareExchange): - self.backend.exchange_declare(order.exchange) - self.declared_exchanges[order.exchange.name] = order.exchange - elif isinstance(order, DeleteExchange): - self.backend.exchange_delete(order.exchange) - if order.exchange.name in self.declared_exchanges: - del self.declared_exchanges[order.exchange.name] - elif isinstance(order, DeclareQueue): - self.backend.queue_declare(order.queue) - elif isinstance(order, DeleteQueue): - self.backend.queue_delete(order.queue) - elif isinstance(order, ConsumeQueue): - if order.queue.consumer_tag in self.queues_by_consumer_tags: - order._completed() - return # already consuming, belay that - - self.backend.queue_declare(order.queue) - - if order.queue.exchange is not None: - self.backend.queue_bind(order.queue, order.queue.exchange) - - self.backend.basic_consume(order.queue, no_ack=order.no_ack) - self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue, order.no_ack - elif isinstance(order, CancelQueue): - try: - q, no_ack = self.queues_by_consumer_tags.pop(order.queue.consumer_tag) - except KeyError: - pass # wat? - else: - self.backend.basic_cancel(order.queue.consumer_tag) - self.event_queue.put(ConsumerCancelled(order.queue, ConsumerCancelled.USER_CANCEL)) - elif isinstance(order, AcknowledgeMessage): - if order.connect_id == self.connect_id: - self.backend.basic_ack(order.delivery_tag) - elif isinstance(order, NAcknowledgeMessage): - if order.connect_id == self.connect_id: - self.backend.basic_reject(order.delivery_tag) - except RemoteAMQPError as e: - logger.error('Remote AMQP error: %s', e) - order._failed(e) # we are allowed to go on - except ConnectionFailedError as e: - logger.error('Connection failed while %s: %s', order, e) - self.order_queue.appendleft(order) - raise - else: - order._completed() - - def __run_wrap(self): # throws _ImOuttaHere - # Loop while there are things to do - while (not self.is_terminating) or (len(self.order_queue) > 0): - try: - while len(self.order_queue) > 0: - self.perform_order() - - # just drain shit - self.backend.process(max_time=0.05) - except ConnectionFailedError as e: - logger.warning('Connection to broker lost: %s', e) - self.cluster.connected = False - self.event_queue.put(ConnectionDown()) - - # =========================== remove SendMessagees with discard_on_fail - my_orders = [] # because order_queue is used by many threads - while len(self.order_queue) > 0: - order = self.order_queue.popleft() - if isinstance(order, SendMessage): - if order.message.discard_on_fail: - order._discard() - continue - - my_orders.append(order) - - # Ok, we have them in order of execution. Append-left in reverse order - # to preserve previous order - for order in reversed(my_orders): - my_orders.appendleft(order) - - self._reconnect() - - def run(self): - try: - self._reconnect() - self.__run_wrap() - except _ImOuttaHere: - pass - - assert self.is_terminating - if self.cluster.connected or (self.backend is not None): - if self.backend is not None: - self.backend.shutdown() - self.backend = None - - self.cluster.connected = False - - def terminate(self): - """ - Called by Cluster. Tells to finish all jobs and quit. - Unacked messages will not be acked. If this is called, connection may die at any time. - """ - self.is_terminating = True - - ## events called - def _on_recvmessage(self, body, exchange_name, routing_key, delivery_tag, properties): - """ - Upon receiving a message - """ - from .messages import ReceivedMessage - - self.event_queue.put(MessageReceived(ReceivedMessage(body, self, - self.connect_id, - exchange_name, - routing_key, - properties, - delivery_tag=delivery_tag))) - - def _on_consumercancelled(self, consumer_tag): - """ - A consumer has been cancelled - """ - try: - queue, no_ack = self.queues_by_consumer_tags.pop(consumer_tag) - except KeyError: - return # what? - - self.event_queue.put(ConsumerCancelled(queue, ConsumerCancelled.BROKER_CANCEL)) - - ## methods to enqueue something into CHT to execute - - def _do_ackmessage(self, receivedMessage, on_completed=None): - """ - Order acknowledging a message. - :param receivedMessage: a ReceivedMessage object to ack - :param on_completed: callable/0 to call when acknowledgemenet succeeded - :return: an AcknowledgeMess - """ - a = AcknowledgeMessage(receivedMessage.connect_id, - receivedMessage.delivery_tag, - on_completed=on_completed) - self.order_queue.append(a) - return a - - - def _do_nackmessage(self, receivedMessage, on_completed=None): - """ - Order acknowledging a message. - :param receivedMessage: a ReceivedMessage object to ack - :param on_completed: callable/0 to call when acknowledgemenet succeeded - """ - a = NAcknowledgeMessage(receivedMessage.connect_id, - receivedMessage.delivery_tag, - on_completed=on_completed) - self.order_queue.append(a) - return a diff --git a/coolamqp/messages.py b/coolamqp/messages.py index 35c7b6896a77da2afff4d54ff9f8bd7f1862b417..a7900adb7e6d7034173cbd775bd8587826824994 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -84,12 +84,9 @@ Exchange.direct = Exchange() class Queue(object): """ This object represents a Queue that applications consume from or publish to. - - Caveat: Please note the locked_after_reconnect option in constructor """ - def __init__(self, name='', durable=False, exchange=None, exclusive=False, auto_delete=False, - locked_after_reconnect='retry'): + def __init__(self, name='', durable=False, exchange=None, exclusive=False, auto_delete=False): """ Create a queue definition. @@ -102,12 +99,6 @@ class Queue(object): :param exchange: Exchange for this queue to bind to. None for no binding. :param exclusive: Is this queue exclusive? :param auto_delete: Is this queue auto_delete ? - :param locked_after_reconnect: Behaviour when queue is exclusive and ACCESS_REFUSED/RESOURCE_LOCKED - is seen on reconnect. Because broker might not know that we have failed, 'retry' will - try again until succeeds (default option). This might block for a long time, until the broker - realizes previous connection is dead and deletes the queue. - 'cancel' will return a ConsumerCancelled to client - 'defer' will attempt to configure the queue later, but will not block other tasks from progressing. """ self.name = name # if name is '', this will be filled in with broker-generated name upon declaration @@ -119,5 +110,9 @@ class Queue(object): self.anonymous = name == '' # if this queue is anonymous, it must be regenerated upon reconnect self.consumer_tag = name if name != '' else uuid.uuid4().hex # consumer tag to use in AMQP comms - self.locked_after_reconnect = locked_after_reconnect - assert locked_after_reconnect in ('retry', 'cancel', 'defer') \ No newline at end of file + + def __eq__(self, other): + return self.name == other.name + + def __hash__(self): + return hash(self.name) diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index 56ef2cf3f8c87073672882ec2f681328fc2054da..428f48aa4c4b6e9ca9e697a815066894544bfd58 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -13,3 +13,4 @@ from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, MethodWatch, AnyWatch from coolamqp.uplink.listener import ListenerThread +from coolamqp.uplink.handshake import PUBLISHER_CONFIRMS, CONSUMER_CANCEL_NOTIFY diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 87cd6dfc97ea19c5694b4300b1772b466102fdd3..c35423ff52e3e63edd886fc7560c503df2dc796a 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -9,9 +9,13 @@ from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ from coolamqp.framing.frames import AMQPMethodFrame from coolamqp.uplink.connection.states import ST_ONLINE + +PUBLISHER_CONFIRMS = b'publisher_confirms' +CONSUMER_CANCEL_NOTIFY = b'consumer_cancel_notify' + SUPPORTED_EXTENSIONS = [ - b'publisher_confirms', - b'consumer_cancel_notify' + PUBLISHER_CONFIRMS, + CONSUMER_CANCEL_NOTIFY ] CLIENT_DATA = [ @@ -32,8 +36,7 @@ class Handshaker(object): Object that given a connection rolls the handshake. """ - def __init__(self, connection, node_definition, - on_success): + def __init__(self, connection, node_definition, on_success): """ :param connection: Connection instance to use :type node_definition: NodeDefinition @@ -74,6 +77,7 @@ class Handshaker(object): server_props = dict(payload.server_properties) if b'capabilities' in server_props: for label, fv in server_props[b'capabilities'][0]: + print('Detected extension: %s' % (label, )) if label in SUPPORTED_EXTENSIONS: if fv[0]: self.connection.extensions.append(label) diff --git a/coolamqp/uplink/listener/__init__.py b/coolamqp/uplink/listener/__init__.py index 897550a7d0f77da9bc56f64d839b53aa40446792..8dd81521b1f146442f610685d1cf9759bc5e4f2c 100644 --- a/coolamqp/uplink/listener/__init__.py +++ b/coolamqp/uplink/listener/__init__.py @@ -16,4 +16,3 @@ from __future__ import absolute_import, division, print_function from coolamqp.uplink.listener.thread import ListenerThread -from coolamqp.uplink.connection import Connection diff --git a/tests/run.py b/tests/run.py index d1fca1139a083f15d894f0c7e62e6cd608f13c6b..3db999c25c0ee1ba23d14bd81b72529165fc40c2 100644 --- a/tests/run.py +++ b/tests/run.py @@ -6,11 +6,11 @@ from coolamqp.connection import NodeDefinition from coolamqp.uplink import Connection import logging -from coolamqp.attaches import Consumer +from coolamqp.attaches import Consumer, Publisher, MODE_NOACK, MODE_CNPUB from coolamqp.messages import Queue -NODE = NodeDefinition('127.0.0.1', 5672, 'user', 'user', heartbeat=5) +NODE = NodeDefinition('127.0.0.1', 'user', 'user', heartbeat=20) logging.basicConfig(level=logging.DEBUG) if __name__ == '__main__': @@ -23,6 +23,12 @@ if __name__ == '__main__': cons = Consumer(Queue('siema-eniu'), no_ack=False) cons.attach(con) + pub1 = Publisher(MODE_NOACK) + pub2 = Publisher(MODE_CNPUB) + + pub1.attach(con) + pub2.attach(con) + while True: time.sleep(10) diff --git a/tests/test_attaches/__init__.py b/tests/test_attaches/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1c762b12fd99adc2f7d4e5137c5b872079457510 --- /dev/null +++ b/tests/test_attaches/__init__.py @@ -0,0 +1,8 @@ +# coding=UTF-8 +from __future__ import print_function, absolute_import, division +import six +import logging + +logger = logging.getLogger(__name__) + + diff --git a/tests/test_attaches/test_utils.py b/tests/test_attaches/test_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..68711ab2fd87669efe9c5cfbf779e1f0d758058d --- /dev/null +++ b/tests/test_attaches/test_utils.py @@ -0,0 +1,62 @@ +# coding=UTF-8 +""" +It sounds like a melody +""" +from __future__ import print_function, absolute_import, division +import six +import unittest + + +from coolamqp.attaches.utils import ManualConfirmableRejectable, AtomicTagger + + +class TestAtomicTagger(unittest.TestCase): + + def test_insertionOrder(self): + at = AtomicTagger() + + a1 = at.get_key() + a2 = at.get_key() + a3 = at.get_key() + + at.deposit(a1, b'ABC') + at.deposit(a3, b'GHI') + at.deposit(a2, b'DEF') + + self.assertEquals(at.tags[0][1], b'ABC') + self.assertEquals(at.tags[1][1], b'DEF') + self.assertEquals(at.tags[2][1], b'GHI') + + def test_1(self): + + at = AtomicTagger() + + a1 = at.get_key() + a2 = at.get_key() + a3 = at.get_key() + + n1 = at.get_key() + n2 = at.get_key() + n3 = at.get_key() + + P = {'acked_P': False, 'nacked_P': False, 'acked_N': False, 'nacked_N': False} + + def assigner(nam, val=True): + def x(): + P[nam] = val + return x + + + at.deposit(a2, ManualConfirmableRejectable(assigner('acked_P'), assigner('nacked_P'))) + at.deposit(n2, ManualConfirmableRejectable(assigner('acked_N'), assigner('nacked_N'))) + + print(at.tags) + + at.ack(a3, True) + at.nack(n3, True) + + self.assertTrue(P['acked_P'] and (not P['nacked_P']) and P['nacked_N'] and (not P['acked_N'])) + + + +