diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 1b4b4392dea89af1540ff29638ea268e3200c3ea..4eb229d16067f198351aea2d6fe2b6afc1bc1a0e 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -13,7 +13,6 @@ from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, BasicConsum from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch - ST_OFFLINE = 0 # Consumer is *not* consuming, no setup attempts are being made ST_SYNCING = 1 # A process targeted at consuming has been started ST_ONLINE = 2 # Consumer is declared all right diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 6dc75e99c28e6e529ae585d60f0da170c7fc61a8..b366c341bede731432ef079d6a7e8d7bfdd8f219 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -9,20 +9,25 @@ If you use a broker that doesn't support these, just don't use MODE_CNPUB. CoolA to check with the broker beforehand. """ from __future__ import absolute_import, division, print_function -import six -import warnings -import logging + import collections -from coolamqp.framing.definitions import ChannelOpenOk +import logging +import warnings + +from coolamqp.framing.definitions import ChannelOpenOk, BasicPublish, Basic, BasicAck +from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, AMQPHeaderFrame try: # these extensions will be available - from coolamqp.framing.definitions import ConfirmSelect, ConfirmSelectOk + from coolamqp.framing.definitions import ConfirmSelect, ConfirmSelectOk, BasicNack except ImportError: pass from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE -from coolamqp.uplink import PUBLISHER_CONFIRMS +from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch +from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable + +from coolamqp.futures import Future logger = logging.getLogger(__name__) @@ -31,6 +36,11 @@ MODE_NOACK = 0 MODE_CNPUB = 1 # this will be available only if suitable extensions were used +# for holding messages when MODE_CNPUB and link is down +CnpubMessageSendOrder = collections.namedtuple('CnpubMessageSendOrder', ('message', 'exchange_name', + 'routing_key', 'future')) + + class Publisher(Channeler): """ An object that is capable of sucking into a Connection and sending messages. @@ -67,52 +77,101 @@ class Publisher(Channeler): # tuple of (Message object, exchange name::str, routing_key::str, # Future to confirm or None, flags as tuple|empty tuple - self.delivery_tag = 0 # next delivery tag + self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB + def _pub(self, message, exchange_name, routing_key): + """ + Just send the message. Sends BasicDeliver + header + body + :param message: Message instance + :param exchange_name: exchange to use + :param routing_key: routing key to use + :type exchange_name: bytes + :param routing_key: bytes + """ + # Break down large bodies + bodies = [] + + body = buffer(message.body) + max_body_size = self.connection.frame_max - AMQPBodyFrame.FRAME_SIZE_WITHOUT_PAYLOAD + while len(body) > 0: + bodies.append(buffer(body, 0, max_body_size)) + body = buffer(body, max_body_size) + + self.connection.send([ + AMQPMethodFrame(self.channel_id, BasicPublish(exchange_name, routing_key, False, False)), + AMQPHeaderFrame(self.channel_id, Basic.INDEX, 0, len(message.body), message.properties) + ]) + + # todo optimize it - if there's only one frame it can with previous send + for body in bodies: + self.connection.send([AMQPBodyFrame(self.channel_id, body)]) + + def _mode_cnpub_process_deliveries(self): + """ + Dispatch all frames that are waiting to be sent + + To be used when mode is MODE_CNPUB and we just got ST_ONLINE + """ + assert self.state == ST_ONLINE + assert self.mode == MODE_CNPUB + + while len(self.messages) > 0: + msg, xchg, rk, fut = self.messages.popleft() + + self.tagger.deposit(self.tagger.get_key(), FutureConfirmableRejectable(fut)) + self._pub(msg, xchg, rk) + + def _on_cnpub_delivery(self, payload): + """ + This gets called on BasicAck and BasicNack, if mode is MODE_CNPUB + """ + assert self.mode == MODE_CNPUB + + print('Got %s with dt=%s' % (payload, payload.delivery_tag)) + + if isinstance(payload, BasicAck): + self.tagger.ack(payload.delivery_tag, payload.multiple) + elif isinstance(payload, BasicNack): + self.tagger.nack(payload.delivery_tag, payload.multiple) def publish(self, message, exchange_name=b'', routing_key=b''): """ Schedule to have a message published. + If mode is MODE_CNPUB: + this function will return a Future. Future can end either with success (result will be None), + or exception (a plain Exception instance). Exception will happen when broker NACKs the message: + that, according to RabbitMQ, means an internal error in Erlang process. + + If mode is MODE_NOACK: + this function returns None. Messages are dropped on the floor if there's no connection. + :param message: Message object to send :param exchange_name: exchange name to use. Default direct exchange by default :param routing_key: routing key to use - :return: a Future object symbolizing delivering the message to AMQP (or any else guarantees publisher mode - will make). - This is None when mode is noack + :return: a Future instance, or None """ # Formulate the request if self.mode == MODE_NOACK: - # If we are not connected right now, drop the message on the floor and log it with DEBUG if self.state != ST_ONLINE: logger.debug(u'Publish request, but not connected - dropping the message') else: - # Dispatch! - pass + self._pub(message, exchange_name, routing_key) + elif self.mode == MODE_CNPUB: + fut = Future() + #todo can optimize this not to create an object if ST_ONLINE already + cnpo = CnpubMessageSendOrder(message, exchange_name, routing_key, fut) + self.messages.append(cnpo) - self.messages.append(( - message, - exchange_name, - routing_key, - None, - () - )) - else: - fut = u'banana banana banana' - self.messages.append(( - message, - exchange_name, - routing_key, - fut - )) - return fut + if self.state == ST_ONLINE: + self._mode_cnpub_process_deliveries() - # Attempt dispatching messages as possible - if self.mode == MODE_NOACK: - pass + return fut + else: + raise Exception(u'Invalid mode') def on_setup(self, payload): @@ -142,5 +201,11 @@ class Publisher(Channeler): self.state = ST_ONLINE self.on_operational(True) + self.tagger = AtomicTagger() + # now we need to listen for BasicAck and BasicNack + mw = MethodWatch(self.channel_id, (BasicAck, BasicNack), self._on_cnpub_delivery) + mw.oneshot = False + self.connection.watch(mw) + self._mode_cnpub_process_deliveries() diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py index 25dd9c6a8bb9d7fe5701a13df624c5bc798b27d5..5ea2981eebdc8c6865b9d311506f5a4e893b0016 100644 --- a/coolamqp/attaches/utils.py +++ b/coolamqp/attaches/utils.py @@ -25,23 +25,19 @@ class ConfirmableRejectable(object): :return: don't care """ -class ManualConfirmableRejectable(ConfirmableRejectable): +class FutureConfirmableRejectable(ConfirmableRejectable): """ - A callback-based way to create ConfirmableRejectable objects + A ConfirmableRejectable that can result a future (with None), + or Exception it with a message """ - def __init__(self, on_ack, on_nack): - """ - :param on_ack: callable/0, will be called on .confirm - :param on_nack: callable/0, will be called on .reject - """ - self.on_ack = on_ack - self.on_nack = on_nack + def __init__(self, future): + self.future = future def confirm(self): - self.on_ack() + self.future.set_result() def reject(self): - self.on_nack() + self.future.set_exception(Exception()) class AtomicTagger(object): diff --git a/coolamqp/framing/compilation/content_property.py b/coolamqp/framing/compilation/content_property.py index df6b963a7d9f94885b2f9fdeac18cd652bee6e55..345691afa06c2ec78f05a07022d24aa679150765 100644 --- a/coolamqp/framing/compilation/content_property.py +++ b/coolamqp/framing/compilation/content_property.py @@ -63,9 +63,12 @@ def _compile_particular_content_property_list_class(zpf, fields): """ # A value for property flags that is used, assuming all bit fields are FALSE (0) ZERO_PROPERTY_FLAGS = %s +''' % (x, )) + if len(present_fields) > 0: + mod.append(u''' def __init__(self, %s): -''' % (x, u', '.join(format_field_name(field.name) for field in present_fields))) +''' % (u', '.join(format_field_name(field.name) for field in present_fields))) for field in present_fields: mod.append(u' self.%s = %s\n'.replace(u'%s', format_field_name(field.name))) diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index d842d37297fd10ede76ff2f4674063bab171dfdd..f45fe5ee263384d9290364a00770ecd440cc21ae 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -5,6 +5,7 @@ Concrete frame definitions from __future__ import absolute_import, division, print_function import struct +import six from coolamqp.framing.base import AMQPFrame from coolamqp.framing.definitions import FRAME_METHOD, FRAME_HEARTBEAT, FRAME_BODY, FRAME_HEADER, FRAME_END, \ @@ -88,11 +89,14 @@ class AMQPHeaderFrame(AMQPFrame): class AMQPBodyFrame(AMQPFrame): FRAME_TYPE = FRAME_BODY + FRAME_SIZE_WITHOUT_PAYLOAD = 8 + def __init__(self, channel, data): """ :type data: binary """ AMQPFrame.__init__(self, channel) + assert isinstance(data, (six.binary_type, buffer, memoryview)) self.data = data def write_to(self, buf): diff --git a/coolamqp/state/orders.py b/coolamqp/futures.py similarity index 60% rename from coolamqp/state/orders.py rename to coolamqp/futures.py index 906cca522656e384e8069f976c277709ea2819f6..661d2e7c352f1c733af58d44ad85de8826a7f140 100644 --- a/coolamqp/state/orders.py +++ b/coolamqp/futures.py @@ -8,10 +8,9 @@ import logging logger = logging.getLogger(__name__) -class BaseOrder(concurrent.futures.Future): +class Future(concurrent.futures.Future): """ - A strange future - only one thread may .wait() for it. - And it's for the best. + A strange future (only one thread may wait for it) """ def __init__(self): self.lock = threading.Lock() @@ -19,7 +18,7 @@ class BaseOrder(concurrent.futures.Future): self.completed = False self.successfully = None - self.result = None + self._result = None self.cancelled = False self.running = True @@ -28,13 +27,33 @@ class BaseOrder(concurrent.futures.Future): def add_done_callback(self, fn): self.callables.append(fn) + def result(self, timeout=None): + assert timeout is None, u'Non-none timeouts not supported' + self.lock.acquire() + + if self.completed: + if self.successfully: + return self._result + else: + raise self._result + else: + if self.cancelled: + raise concurrent.futures.CancelledError() + else: + # it's invalid to release the lock, not do the future if it's not cancelled + raise RuntimeError(u'Invalid state!') + def cancel(self): + """ + When cancelled, future will attempt not to complete (completed=False). + :return: + """ self.cancelled = True def __finish(self, result, successful): self.completed = True self.successfully = successful - self.result = result + self._result = result self.lock.release() for callable in self.callables: @@ -50,22 +69,3 @@ class BaseOrder(concurrent.futures.Future): def set_exception(self, exception): self.__finish(exception, False) - - -class SendMessage(BaseOrder): - """ - An order to send a message somewhere, such that message will survive disconnects - from broker and so on. - """ - def __init__(self, message, exchange_name, routing_key): - """ - :param message: Message object to send - :param exchange_name: bytes, name of exchange - :param routing_key: bytes, routing key - """ - BaseOrder.__init__(self) - - self.message = message - self.exchange_name = exchange_name - self.routing_key = routing_key - diff --git a/coolamqp/messages.py b/coolamqp/messages.py index a7900adb7e6d7034173cbd775bd8587826824994..380f0bce746e41d2b50920316abf92262266c2a9 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -2,6 +2,14 @@ import uuid import six +from coolamqp.framing.definitions import BasicContentPropertyList as MessageProperties + + +__all__ = ('Message', 'ReceivedMessage', 'MessageProperties') + + +EMPTY_PROPERTIES = MessageProperties() + class Message(object): """AMQP message object""" @@ -14,12 +22,13 @@ class Message(object): :param body: stream of octets :type body: str (py2) or bytes (py3) - :param properties: AMQP properties to be sent along + :param properties: AMQP properties to be sent along. + default is 'no properties at all' """ if isinstance(body, six.text_type): raise TypeError('body cannot be a text type!') self.body = six.binary_type(body) - self.properties = properties or {} + self.properties = properties or EMPTY_PROPERTIES LAMBDA_NONE = lambda: None diff --git a/coolamqp/state/__init__.py b/coolamqp/state/__init__.py deleted file mode 100644 index 2e65b9091f536371e7b6fa2a03590c2c3e286a63..0000000000000000000000000000000000000000 --- a/coolamqp/state/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -""" -Any operations the user does, are against the BROKER STATE. - -The connections are not important - what is important, is the broker state. This is a virtual -aggregate of all operations that are running against the cluster - ie. queues subscribed, -messages pending to be sent. - -The client doesn't matter how this is handled. CoolAMQP (in the future) may decide to subscribe some -queues against other node, if it decides that the master queue is there, or do something entirely else. -""" \ No newline at end of file diff --git a/coolamqp/state/state.py b/coolamqp/state/state.py deleted file mode 100644 index cc223ca446a78dfc620f3218ecf59ff9385e0a3d..0000000000000000000000000000000000000000 --- a/coolamqp/state/state.py +++ /dev/null @@ -1,22 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import concurrent.futures - - -class BrokerState(object): - """ - A state of the broker. List of messages to send (including their dispositions) - and so on. - """ - - def __init__(self): - - # Transient messages - THESE WILL BE DROPPED ON THE FLOOR UPON A DISCONNECT - self.messages_to_push = [] # list of (Message object, exchange_name, routing_key) - - # Messages to publish - THESE WILL NOT BE DROPPED ON THE FLOOR UPON A DC - self.messages_to_tx = [] # list of (SendMessage object) - - # a list of (Queue instance, delivery_tag, auto_ack) - self.queues_subscribed = [] - diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 278fde553714857ec2dfb9d398c7e68fe73a2cbc..79c8d831ca9bb4e5b4b2f2a51fbe3c207f803287 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -144,15 +144,20 @@ class Connection(object): def send(self, frames, priority=False): """ + Schedule to send some frames. + + Take care: This won't stop you from sending frames larger tham frame_max. + Broker will probably close the connection if he sees that. + :param frames: list of frames or None to close the link :param reason: optional human-readable reason for this action """ if frames is not None: - # for frame in frames: - # if isinstance(frame, AMQPMethodFrame): - # print('Sending ', frame.payload) - # else: - # print('Sending ', frame) + for frame in frames: + if isinstance(frame, AMQPMethodFrame): + print('Sending ', frame.payload) + else: + print('Sending ', frame) self.sendf.send(frames, priority=priority) else: # Listener socket will kill us when time is right @@ -169,10 +174,10 @@ class Connection(object): :param frame: AMQPFrame that was received """ - # if isinstance(frame, AMQPMethodFrame): # temporary, for debugging - # print('RECEIVED', frame.payload.NAME) - # else: - # print('RECEIVED ', frame) + if isinstance(frame, AMQPMethodFrame): # temporary, for debugging + print('RECEIVED', frame.payload.NAME) + else: + print('RECEIVED ', frame) watch_handled = False # True if ANY watch handled this diff --git a/requirements.txt b/requirements.txt index 5fecbcdb185943b6108d6983bf76d394d392c227..e791b2aada9c24c75598436e3e21008bda714327 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ amqp six monotonic +futures diff --git a/tests/run.py b/tests/run.py index 3db999c25c0ee1ba23d14bd81b72529165fc40c2..a54aab3771946ea00301d33fd6054efe318e4fed 100644 --- a/tests/run.py +++ b/tests/run.py @@ -1,13 +1,14 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function from coolamqp.uplink import ListenerThread -import time +import time, logging, threading +from coolamqp.messages import Message, MessageProperties from coolamqp.connection import NodeDefinition from coolamqp.uplink import Connection -import logging from coolamqp.attaches import Consumer, Publisher, MODE_NOACK, MODE_CNPUB from coolamqp.messages import Queue +import time NODE = NodeDefinition('127.0.0.1', 'user', 'user', heartbeat=20) @@ -20,14 +21,23 @@ if __name__ == '__main__': con = Connection(NODE, lt) con.start() - cons = Consumer(Queue('siema-eniu'), no_ack=False) + cons = Consumer(Queue('siema-eniu'), no_ack=True) cons.attach(con) - pub1 = Publisher(MODE_NOACK) - pub2 = Publisher(MODE_CNPUB) - pub1.attach(con) - pub2.attach(con) + class IPublishThread(threading.Thread): + def __init__(self): + super(IPublishThread, self).__init__() + self.daemon = True + + def run(self): + pub2 = Publisher(MODE_NOACK) + pub2.attach(con) + while True: + pub2.publish(Message(b'you dawg'), routing_key=b'siema-eniu') + + + IPublishThread().start() while True: time.sleep(10) diff --git a/tests/test_attaches/__init__.py b/tests/test_attaches/__init__.py deleted file mode 100644 index 1c762b12fd99adc2f7d4e5137c5b872079457510..0000000000000000000000000000000000000000 --- a/tests/test_attaches/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -# coding=UTF-8 -from __future__ import print_function, absolute_import, division -import six -import logging - -logger = logging.getLogger(__name__) - - diff --git a/tests/test_attaches/test_utils.py b/tests/test_attaches/test_utils.py deleted file mode 100644 index 68711ab2fd87669efe9c5cfbf779e1f0d758058d..0000000000000000000000000000000000000000 --- a/tests/test_attaches/test_utils.py +++ /dev/null @@ -1,62 +0,0 @@ -# coding=UTF-8 -""" -It sounds like a melody -""" -from __future__ import print_function, absolute_import, division -import six -import unittest - - -from coolamqp.attaches.utils import ManualConfirmableRejectable, AtomicTagger - - -class TestAtomicTagger(unittest.TestCase): - - def test_insertionOrder(self): - at = AtomicTagger() - - a1 = at.get_key() - a2 = at.get_key() - a3 = at.get_key() - - at.deposit(a1, b'ABC') - at.deposit(a3, b'GHI') - at.deposit(a2, b'DEF') - - self.assertEquals(at.tags[0][1], b'ABC') - self.assertEquals(at.tags[1][1], b'DEF') - self.assertEquals(at.tags[2][1], b'GHI') - - def test_1(self): - - at = AtomicTagger() - - a1 = at.get_key() - a2 = at.get_key() - a3 = at.get_key() - - n1 = at.get_key() - n2 = at.get_key() - n3 = at.get_key() - - P = {'acked_P': False, 'nacked_P': False, 'acked_N': False, 'nacked_N': False} - - def assigner(nam, val=True): - def x(): - P[nam] = val - return x - - - at.deposit(a2, ManualConfirmableRejectable(assigner('acked_P'), assigner('nacked_P'))) - at.deposit(n2, ManualConfirmableRejectable(assigner('acked_N'), assigner('nacked_N'))) - - print(at.tags) - - at.ack(a3, True) - at.nack(n3, True) - - self.assertTrue(P['acked_P'] and (not P['nacked_P']) and P['nacked_N'] and (not P['acked_N'])) - - - -