diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 119ff54bcb0abc80c6c06e2fa4879b5ceccfd854..7f628cd2fd100eeb88bfaf275d24f60b735fd6d3 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -1,6 +1,6 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import uuid +import six from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, AMQPHeaderFrame from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, BasicConsume, \ BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ @@ -14,11 +14,11 @@ ST_SYNCING = 1 # A process targeted at consuming has been started ST_ONLINE = 2 # Consumer is declared all right -EV_ONLINE = 0 # called upon consumer being online and consuming -EV_CANCEL = 1 # consumer has been cancelled by BasicCancel. +EV_ONLINE = 7 # called upon consumer being online and consuming +EV_CANCEL = 8 # consumer has been cancelled by BasicCancel. # EV_OFFLINE will follow immediately -EV_OFFLINE = 2 # channel down -EV_MESSAGE = 3 # received a message +EV_OFFLINE = 9 # channel down +EV_MESSAGE = 10 # received a message class Consumer(object): """ @@ -72,26 +72,32 @@ class Consumer(object): arg={EV_ONLINE} called directly after setup. self.state is not yet set! args={EV_CANCEL} seen a RabbitMQ consumer cancel notify. EV_OFFLINE follows args={EV_OFFLINE} sent when a channel is no longer usable. It may not yet have been torn down. + this will be called only if EV_ONLINE was previously dispatched arg={EV_MESSAGE} """ + assert event in (EV_OFFLINE, EV_CANCEL, EV_MESSAGE, EV_ONLINE) + if event == EV_OFFLINE and (self.state is not ST_ONLINE): return # No point in processing that if event == EV_ONLINE: + print('Entering ST_ONLINE') self.state = ST_ONLINE assert self.receiver is None self.receiver = MessageReceiver(self) elif event == EV_OFFLINE: + self.receiver.on_gone() self.receiver = None def __stateto(self, st): """if st is not current state, statify it. As an extra if it's a transition to ST_OFFLINE, send an event""" - if (self.state != ST_OFFLINE) and (st == ST_OFFLINE): - self.on_event(ST_OFFLINE) - else: - self.state = st + if (self.state == ST_ONLINE) and (st == ST_OFFLINE): + # the only moment when EV_OFFLINE is generated + self.on_event(EV_OFFLINE) + + self.state = st def on_close(self, payload=None): """ @@ -105,6 +111,8 @@ class Consumer(object): should_retry = False release_channel = False + print('HAYWIRE ', payload) + if isinstance(payload, BasicCancel): # Consumer Cancel Notification - by RabbitMQ self.on_event(EV_CANCEL) @@ -292,12 +300,50 @@ class MessageReceiver(object): self.state = 0 # 0 - waiting for Basic-Deliver # 1 - waiting for Header # 2 - waiting for Body [all] + # 3 - gone! self.bdeliver = None # payload of Basic-Deliver self.header = None # AMQPHeaderFrame self.body = [] # list of payloads self.data_to_go = None # set on receiving header, how much bytes we need yet + self.acks_pending = set() # list of things to ack/reject + + def on_gone(self): + """Called by Consumer to inform upon discarding this receiver""" + self.state = 3 + + def confirm(self, delivery_tag, success): + """ + This crafts a constructor for confirming messages. + + This should return a callable/0, whose calling will ACK or REJECT the message. + Calling it multiple times should have no ill effect. + + If this receiver is long gone, + + :param delivery_tag: delivery_tag to ack + :param success: True if ACK, False if REJECT + :return: callable/0 + """ + + def callable(): + if self.state == 3: + return # Gone! + + if delivery_tag not in self.acks_pending: + return # already confirmed/rejected + + if success: + self.consumer.connection.send([AMQPMethodFrame(self.consumer.channel_id, + BasicAck(delivery_tag, False))]) + else: + self.consumer.connection.send([AMQPMethodFrame(self.consumer.channel_id, + BasicReject(delivery_tag, True))]) + + return callable + + def on_head(self, frame): assert self.state == 1 self.header = frame @@ -310,13 +356,37 @@ class MessageReceiver(object): self.state = 1 def on_body(self, payload): + """:type payload: buffer""" assert self.state == 2 self.body.append(payload) self.data_to_go -= len(payload) assert self.data_to_go >= 0 if self.data_to_go == 0: + ack_expected = not self.consumer.no_ack + # Message A-OK! - print('Yo, got a message of %s' % (u''.join(map(str, self.body)))) + + if ack_expected: + self.acks_pending.add(self.bdeliver.delivery_tag) + + from coolamqp.messages import ReceivedMessage + rm = ReceivedMessage( + b''.join(map(six.binary_type, self.body)), #todo inefficient as FUUUCK + self.bdeliver.exchange, + self.bdeliver.routing_key, + self.header.properties, + self.bdeliver.delivery_tag, + None if self.consumer.no_ack else self.confirm(self.bdeliver.delivery_tag, True), + None if self.consumer.no_ack else self.confirm(self.bdeliver.delivery_tag, False), + ) + +# print('hello seal - %s\nctype: %s\ncencod: %s\n' % (rm.body, +# rm.properties.__dict__.get('content_type', b'<EMPTY>'), +# rm.properties.__dict__.get('content_encoding', b'<EMPTY>'))) + + if ack_expected: + rm.ack() + self.state = 0 # at this point it's safe to clear the body diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index 0bc471b2a7215379b218decc12d7bdbbca9d5446..3ee0f7a13b4f537482d26cd7a3fdca3ad69ac6cc 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -11,18 +11,18 @@ AMQP_HELLO_HEADER = b'AMQP\x00\x00\x09\x01' # name => (length|None, struct ID|None, reserved-field-value : for struct if structable, bytes else, length of default) -BASIC_TYPES = {'bit': (None, None, "0", None), # special case - 'octet': (1, 'B', "b'\\x00'", 1), - 'short': (2, 'H', "b'\\x00\\x00'", 2), - 'long': (4, 'I', "b'\\x00\\x00\\x00\\x00'", 4), - 'longlong': (8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), - 'timestamp': (8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), - 'table': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case - 'longstr': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case - 'shortstr': (None, None, "b'\\x00'", 1), # special case +BASIC_TYPES = {u'bit': (None, None, "0", None), # special case + u'octet': (1, 'B', "b'\\x00'", 1), + u'short': (2, 'H', "b'\\x00\\x00'", 2), + u'long': (4, 'I', "b'\\x00\\x00\\x00\\x00'", 4), + u'longlong': (8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), + u'timestamp': (8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), + u'table': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case + u'longstr': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case + u'shortstr': (None, None, "b'\\x00'", 1), # special case } -DYNAMIC_BASIC_TYPES = ('table', 'longstr', 'shortstr') +DYNAMIC_BASIC_TYPES = (u'table', u'longstr', u'shortstr') class AMQPFrame(object): # base class for framing diff --git a/coolamqp/framing/compilation/compile_definitions.py b/coolamqp/framing/compilation/compile_definitions.py index 9c67c709046bc74b8550f9a42cf36ae284b68eeb..33ac881f7203505031238a3b8a5f515e277e2fa4 100644 --- a/coolamqp/framing/compilation/compile_definitions.py +++ b/coolamqp/framing/compilation/compile_definitions.py @@ -282,7 +282,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved """ # extract property flags pfl = 2 - while ord(buf[offset + pfl]) & 1: + while ord(buf[offset + pfl - 1]) & 1: pfl += 2 zpf = %s.zero_property_flags(buf[offset:offset+pfl]) if zpf in %s.PARTICULAR_CLASSES: @@ -403,7 +403,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved offset = start_offset ''') - line(get_from_buffer(method.fields, '', 2)) + line(get_from_buffer(method.fields, '', 2, remark=(method.name == 'deliver'))) line(" return %s(%s)", full_class_name, u', '.join(format_field_name(field.name) for field in method.fields if not field.reserved)) diff --git a/coolamqp/framing/compilation/textcode_fields.py b/coolamqp/framing/compilation/textcode_fields.py index e8fb3c8dd45a5bee0658df205aff8ba46453cdba..3a1722031e875d33ce39bb86cfc4dd9ba9ec85a0 100644 --- a/coolamqp/framing/compilation/textcode_fields.py +++ b/coolamqp/framing/compilation/textcode_fields.py @@ -66,9 +66,10 @@ def get_counter(fields, prefix='', indent_level=2): return (u' '*indent_level)+u'return '+(u' + '.join([str(accumulator)]+parts))+u'\n' -def get_from_buffer(fields, prefix='', indent_level=2): +def get_from_buffer(fields, prefix='', indent_level=2, remark=False): """ Emit code that collects values from buf:offset, updating offset as progressing. + :param remark: BE FUCKING VERBOSE! #DEBUG """ code = [] def emit(fmt, *args): @@ -88,11 +89,16 @@ def get_from_buffer(fields, prefix='', indent_level=2): to_struct = [] def emit_bits(): + if len(bits) == 0: + return + if remark: + print('Bits are being banged') if all(n == '_' for n in bits): # everything is reserved, lol emit('offset += 1') else: - emit("_bit, = struct.unpack_from('!B', buf, offset)") + to_struct.append(('_bit', 'B')) + emit_structures(dont_do_bits=True) for multiplier, bit in enumerate(bits): if bit != '_': @@ -101,7 +107,11 @@ def get_from_buffer(fields, prefix='', indent_level=2): del bits[:] - def emit_structures(): + def emit_structures(dont_do_bits=False): + if not dont_do_bits: + emit_bits() + if len(to_struct) == 0: + return fffnames = [a for a, b in to_struct if a != u'_'] # skip reserved ffffmts = [b for a, b in to_struct] emit("%s, = struct.unpack_from('!%s', buf, offset)", u', '.join(fffnames), u''.join(ffffmts)) @@ -112,23 +122,33 @@ def get_from_buffer(fields, prefix='', indent_level=2): for field in fields: fieldname = prefix+format_field_name(field.name) - if (len(bits) > 0) and (field.basic_type != 'bit'): + if (len(bits) > 0) and (field.basic_type != u'bit'): emit_bits() + if remark: + print('Doing', fieldname, 'of type', field.basic_type) + # offset is current start # length is length to read if BASIC_TYPES[field.basic_type][0] is not None: # static type shit has + + assert len(bits) == 0 + if field.reserved: to_struct.append((u'_', '%sx' % (BASIC_TYPES[field.basic_type][0],))) else: to_struct.append((fieldname, BASIC_TYPES[field.basic_type][1])) + ln['ln'] += BASIC_TYPES[field.basic_type][0] elif field.basic_type == u'bit': bits.append('_' if field.reserved else fieldname) elif field.basic_type == u'table': # oh my god - if len(to_struct) > 0: - emit_structures() + emit_structures() + + assert len(bits) == 0 + assert len(to_struct) == 0 + emit("%s, delta = deframe_table(buf, offset)", fieldname) emit("offset += delta") else: # longstr or shortstr @@ -146,10 +166,7 @@ def get_from_buffer(fields, prefix='', indent_level=2): if len(bits) == 8: emit_bits() - if len(bits) > 0: - emit_bits() - if len(to_struct) > 0: - emit_structures() + emit_structures() return u''.join(code) diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 2959949cf8dbb936b69d275d28cbf7022ba68556..3f1505ec1cf1cb07ef9feb1d4d305f2d1069b711 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -924,6 +924,7 @@ class ChannelFlow(AMQPMethodPayload): def from_buffer(buf, start_offset): offset = start_offset _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 active = bool(_bit >> 0) offset += 1 return ChannelFlow(active) @@ -971,6 +972,7 @@ class ChannelFlowOk(AMQPMethodPayload): def from_buffer(buf, start_offset): offset = start_offset _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 active = bool(_bit >> 0) offset += 1 return ChannelFlowOk(active) @@ -1144,6 +1146,7 @@ class ExchangeBind(AMQPMethodPayload): routing_key = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 no_wait = bool(_bit >> 0) offset += 1 arguments, delta = deframe_table(buf, offset) @@ -1286,6 +1289,7 @@ class ExchangeDeclare(AMQPMethodPayload): type_ = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 passive = bool(_bit >> 0) durable = bool(_bit >> 1) auto_delete = bool(_bit >> 2) @@ -1356,6 +1360,7 @@ class ExchangeDelete(AMQPMethodPayload): exchange = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 if_unused = bool(_bit >> 0) no_wait = bool(_bit >> 1) offset += 1 @@ -1499,6 +1504,7 @@ class ExchangeUnbind(AMQPMethodPayload): routing_key = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 no_wait = bool(_bit >> 0) offset += 1 arguments, delta = deframe_table(buf, offset) @@ -1636,6 +1642,7 @@ class QueueBind(AMQPMethodPayload): routing_key = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 no_wait = bool(_bit >> 0) offset += 1 arguments, delta = deframe_table(buf, offset) @@ -1768,6 +1775,7 @@ class QueueDeclare(AMQPMethodPayload): queue = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 passive = bool(_bit >> 0) durable = bool(_bit >> 1) exclusive = bool(_bit >> 2) @@ -1844,6 +1852,7 @@ class QueueDelete(AMQPMethodPayload): queue = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 if_unused = bool(_bit >> 0) if_empty = bool(_bit >> 1) no_wait = bool(_bit >> 2) @@ -2008,6 +2017,7 @@ class QueuePurge(AMQPMethodPayload): queue = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 no_wait = bool(_bit >> 0) offset += 1 return QueuePurge(queue, no_wait) @@ -2285,7 +2295,7 @@ class BasicContentPropertyList(AMQPContentPropertyList): """ # extract property flags pfl = 2 - while ord(buf[offset + pfl]) & 1: + while ord(buf[offset + pfl - 1]) & 1: pfl += 2 zpf = BasicContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: @@ -2351,11 +2361,10 @@ class BasicAck(AMQPMethodPayload): @staticmethod def from_buffer(buf, start_offset): offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + offset += 8 multiple = bool(_bit >> 0) offset += 1 - delivery_tag, = struct.unpack_from('!Q', buf, offset) - offset += 8 return BasicAck(delivery_tag, multiple) @@ -2443,6 +2452,7 @@ class BasicConsume(AMQPMethodPayload): consumer_tag = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 no_local = bool(_bit >> 0) no_ack = bool(_bit >> 1) exclusive = bool(_bit >> 2) @@ -2514,6 +2524,7 @@ class BasicCancel(AMQPMethodPayload): consumer_tag = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 no_wait = bool(_bit >> 0) offset += 1 return BasicCancel(consumer_tag, no_wait) @@ -2679,11 +2690,12 @@ class BasicDeliver(AMQPMethodPayload): offset += 1 consumer_tag = buf[offset:offset+s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + offset += 8 redelivered = bool(_bit >> 0) offset += 1 - delivery_tag, s_len, = struct.unpack_from('!QB', buf, offset) - offset += 9 + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 exchange = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) @@ -2746,6 +2758,7 @@ class BasicGet(AMQPMethodPayload): queue = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 no_ack = bool(_bit >> 0) offset += 1 return BasicGet(queue, no_ack) @@ -2811,11 +2824,12 @@ class BasicGetOk(AMQPMethodPayload): @staticmethod def from_buffer(buf, start_offset): offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + offset += 8 redelivered = bool(_bit >> 0) offset += 1 - delivery_tag, s_len, = struct.unpack_from('!QB', buf, offset) - offset += 9 + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 exchange = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) @@ -2925,12 +2939,11 @@ class BasicNack(AMQPMethodPayload): @staticmethod def from_buffer(buf, start_offset): offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + offset += 8 multiple = bool(_bit >> 0) requeue = bool(_bit >> 1) offset += 1 - delivery_tag, = struct.unpack_from('!Q', buf, offset) - offset += 8 return BasicNack(delivery_tag, multiple, requeue) @@ -3013,6 +3026,7 @@ class BasicPublish(AMQPMethodPayload): routing_key = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 mandatory = bool(_bit >> 0) immediate = bool(_bit >> 1) offset += 1 @@ -3090,11 +3104,10 @@ class BasicQos(AMQPMethodPayload): @staticmethod def from_buffer(buf, start_offset): offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + prefetch_size, prefetch_count, _bit, = struct.unpack_from('!IHB', buf, offset) + offset += 6 global_ = bool(_bit >> 0) offset += 1 - prefetch_size, prefetch_count, = struct.unpack_from('!IH', buf, offset) - offset += 6 return BasicQos(prefetch_size, prefetch_count, global_) @@ -3249,11 +3262,10 @@ class BasicReject(AMQPMethodPayload): @staticmethod def from_buffer(buf, start_offset): offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + offset += 8 requeue = bool(_bit >> 0) offset += 1 - delivery_tag, = struct.unpack_from('!Q', buf, offset) - offset += 8 return BasicReject(delivery_tag, requeue) @@ -3302,6 +3314,7 @@ class BasicRecoverAsync(AMQPMethodPayload): def from_buffer(buf, start_offset): offset = start_offset _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 requeue = bool(_bit >> 0) offset += 1 return BasicRecoverAsync(requeue) @@ -3352,6 +3365,7 @@ class BasicRecover(AMQPMethodPayload): def from_buffer(buf, start_offset): offset = start_offset _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 requeue = bool(_bit >> 0) offset += 1 return BasicRecover(requeue) @@ -3653,6 +3667,7 @@ class ConfirmSelect(AMQPMethodPayload): def from_buffer(buf, start_offset): offset = start_offset _bit, = struct.unpack_from('!B', buf, offset) + offset += 0 nowait = bool(_bit >> 0) offset += 1 return ConfirmSelect(nowait) diff --git a/coolamqp/messages.py b/coolamqp/messages.py index 2a2dde2f04b6bcf4f16ddfbf3dacff8ae1fcca25..35c7b6896a77da2afff4d54ff9f8bd7f1862b417 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -22,10 +22,16 @@ class Message(object): self.properties = properties or {} +LAMBDA_NONE = lambda: None + class ReceivedMessage(Message): """Message as received from AMQP system""" - def __init__(self, body, cht, connect_id, exchange_name, routing_key, properties=None, delivery_tag=None): + def __init__(self, body, exchange_name, routing_key, + properties=None, + delivery_tag=None, + ack=None, + nack=None): """ :param body: message body. A stream of octets. :type body: str (py2) or bytes (py3) @@ -34,39 +40,22 @@ class ReceivedMessage(Message): not to ack messages that were received from a dead connection :param exchange_name: name of exchange this message was submitted to :param routing_key: routing key with which this message was sent - :param properties: dictionary. Headers received from AMQP or None for empty dict + :param properties: a suitable BasicContentPropertyList subinstance - :param delivery_tag: delivery tag assigned by AMQP broker to confirm this message. - leave None if auto-ack + :param delivery_tag: delivery tag assigned by AMQP broker to confirm this message + :param ack: a callable to call when you want to ack (via basic.ack) this message. None if received + by the no-ack mechanism + :param nack: a callable to call when you want to nack (via basic.reject) this message. None if received + by the no-ack mechanism """ Message.__init__(self, body, properties=properties) - self.cht = cht - self.connect_id = connect_id self.delivery_tag = delivery_tag self.exchange_name = exchange_name self.routing_key = routing_key - def nack(self, on_completed=None): - """ - Negative-acknowledge this message to the broker. - - This internally results in a basic.reject - - :param on_completed: callable/0 to call on acknowledged. Callable will be executed in - ClusterHandlerThread's context. - :return: an Order, that can ve waited upon for a result - """ - return self.cht._do_nackmessage(self, on_completed=on_completed) - - def ack(self, on_completed=None): - """ - Acknowledge this message to the broker. - :param on_completed: callable/0 to call on acknowledged. Callable will be executed in - ClusterHandlerThread's context. - :return: an Order, that can ve waited upon for a result - """ - return self.cht._do_ackmessage(self, on_completed=on_completed) + self.ack = ack or LAMBDA_NONE + self.nack = nack or LAMBDA_NONE class Exchange(object): diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index 93359f8ae5108b589e4cc1d0ec2edded892c7ab4..56ef2cf3f8c87073672882ec2f681328fc2054da 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -11,5 +11,5 @@ Watches will fire upon an event triggering them. """ from __future__ import absolute_import, division, print_function -from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, MethodWatch +from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, MethodWatch, AnyWatch from coolamqp.uplink.listener import ListenerThread diff --git a/coolamqp/uplink/connection/__init__.py b/coolamqp/uplink/connection/__init__.py index f32f3c97c7ebcaf39d929792a2a83a7fc41b15aa..7ec0d01c5042462dcf11aa52740f21c667e1f0ae 100644 --- a/coolamqp/uplink/connection/__init__.py +++ b/coolamqp/uplink/connection/__init__.py @@ -12,5 +12,5 @@ Connection is something that can: from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection.connection import Connection -from coolamqp.uplink.connection.watches import FailWatch, Watch, HeaderOrBodyWatch, MethodWatch +from coolamqp.uplink.connection.watches import FailWatch, Watch, HeaderOrBodyWatch, MethodWatch, AnyWatch from coolamqp.uplink.connection.states import ST_OFFLINE, ST_CONNECTING, ST_ONLINE diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 53aae6a04a81f6bff7ed71be046e1bde92da0be5..278fde553714857ec2dfb9d398c7e68fe73a2cbc 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -21,7 +21,9 @@ class Connection(object): """ An object that manages a connection in a comprehensive way. - It allows for sending and registering watches for particular things. + It allows for sending and registering watches for particular things. Watch will + listen for eg. frame on particular channel, frame on any channel, or connection teardown. + Watches will also get a callback for connection being non-operational (eg. torn down). WARNING: Thread-safety of watch operation hinges on atomicity of .append and .pop. @@ -48,6 +50,7 @@ class Connection(object): self.recvf = ReceivingFramer(self.on_frame) self.watches = {} # channel => list of [Watch instance] + self.any_watches = [] # list of Watches that should check everything self.state = ST_CONNECTING @@ -119,7 +122,11 @@ class Connection(object): for watch in watchlist: watch.failed() + for watch in self.any_watches: + watch.failed() + self.watches = {} # Clear the watch list + self.any_watches = [] def on_connection_close(self, payload): """ @@ -135,18 +142,18 @@ class Connection(object): elif isinstance(payload, ConnectionCloseOk): self.send(None) - def send(self, frames): + def send(self, frames, priority=False): """ :param frames: list of frames or None to close the link :param reason: optional human-readable reason for this action """ if frames is not None: - for frame in frames: - if isinstance(frame, AMQPMethodFrame): - print('Sending ', frame.payload) - else: - print('Sending ', frame) - self.sendf.send(frames) + # for frame in frames: + # if isinstance(frame, AMQPMethodFrame): + # print('Sending ', frame.payload) + # else: + # print('Sending ', frame) + self.sendf.send(frames, priority=priority) else: # Listener socket will kill us when time is right self.listener_socket.send(None) @@ -162,12 +169,14 @@ class Connection(object): :param frame: AMQPFrame that was received """ - if isinstance(frame, AMQPMethodFrame): # temporary, for debugging - print('RECEIVED', frame.payload.NAME) - else: - print('RECEIVED ', frame) + # if isinstance(frame, AMQPMethodFrame): # temporary, for debugging + # print('RECEIVED', frame.payload.NAME) + # else: + # print('RECEIVED ', frame) watch_handled = False # True if ANY watch handled this + + # ==================== process per-channel watches if frame.channel in self.watches: watches = self.watches[frame.channel] # a list @@ -188,6 +197,20 @@ class Connection(object): for watch in alive_watches: watches.append(watch) + # ==================== process "any" watches + alive_watches = [] + while len(self.any_watches): + watch = self.any_watches.pop() + watch_triggered = watch.is_triggered_by(frame) + watch_handled |= watch_triggered + + if (not watch_triggered) or (not watch.oneshot): + # Watch remains alive if it was NOT triggered, or it's NOT a oneshot + alive_watches.append(watch) + + for watch in alive_watches: + self.any_watches.append(watch) + if not watch_handled: logger.critical('Unhandled frame %s', frame) @@ -212,7 +235,9 @@ class Connection(object): Register a watch. :param watch: Watch to register """ - if watch.channel not in self.watches: + if watch.channel is None: + self.any_watches.append(watch) + elif watch.channel not in self.watches: self.watches[watch.channel] = collections.deque([watch]) else: self.watches[watch.channel].append(watch) diff --git a/coolamqp/uplink/connection/send_framer.py b/coolamqp/uplink/connection/send_framer.py index 14b76fdcc2436d8ac4499774014cddc8c213908d..bb07e45552b6fe213454b344fdd68805c250b41a 100644 --- a/coolamqp/uplink/connection/send_framer.py +++ b/coolamqp/uplink/connection/send_framer.py @@ -28,15 +28,17 @@ class SendingFramer(object): """ def __init__(self, on_send=lambda data: None): """ - :param on_send: a callable that can be called with some data to send + :param on_send: a callable(data, priority=False) that can be called with some data to send + data will always be entire AMQP frames! """ self.on_send = on_send - def send(self, frames): + def send(self, frames, priority=False): """ Schedule to send some frames. :param frames: list of AMQPFrame instances + :param priority: preempty existing frames """ length = sum(frame.get_size() for frame in frames) buf = io.BytesIO(bytearray(length)) @@ -45,4 +47,4 @@ class SendingFramer(object): frame.write_to(buf) q = buf.getvalue() - self.on_send(q) + self.on_send(q, priority) diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index 4f73cfbe67e6c19382940c0ddee8b0807f50ee2b..ef61487823e00ff610d81fb8c5caad220061e7f0 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -11,7 +11,8 @@ class Watch(object): def __init__(self, channel, oneshot): """ - :param channel: Channel to listen to + :param channel: Channel to listen to. + all channels if None is passed :param oneshot: Is destroyed after triggering? """ self.channel = channel @@ -41,6 +42,28 @@ class Watch(object): self.cancelled = True +class AnyWatch(Watch): + """ + Watch that listens for any frame. + + It does not listen for failures. + + Used because heartbeating is implemented improperly EVERYWHERE + (ie. you might not get a heartbeat when connection is so loaded it just can't get it in time, + due to loads and loads of message exchanging). + + Eg. RabbitMQ will happily disconnect you if you don't, but it can get lax with heartbeats + as it wants. + """ + def __init__(self, callable): + Watch.__init__(self, None, False) + self.callable = callable + + def is_triggered_by(self, frame): + self.callable(frame) + return True + + class FailWatch(Watch): """ A special kind of watch that fires when connection has died @@ -56,21 +79,6 @@ class FailWatch(Watch): self.callable() -class HeartbeatWatch(Watch): - """ - Registered if heartbeats are enabled - """ - def __init__(self, callable): - Watch.__init__(self, 0, False) - self.callable = callable - - def is_triggered_by(self, frame): - if isinstance(frame, AMQPHeartbeatFrame): - self.callable() - return True - return False - - class HeaderOrBodyWatch(Watch): """ A multi-shot watch listening for AMQP header or body frames diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py index 79cca8715e1120db85400f74a91c86079108934d..bb17fc0dff5cc5c1317c39eed91b47328382c453 100644 --- a/coolamqp/uplink/heartbeat.py +++ b/coolamqp/uplink/heartbeat.py @@ -3,7 +3,8 @@ from __future__ import absolute_import, division, print_function import monotonic from coolamqp.framing.frames import AMQPHeartbeatFrame -from coolamqp.uplink.connection.watches import HeartbeatWatch +from coolamqp.uplink.connection.watches import AnyWatch + class Heartbeater(object): """ @@ -17,18 +18,35 @@ class Heartbeater(object): self.last_heartbeat_on = monotonic.monotonic() # last heartbeat from server self.connection.watchdog(self.heartbeat_interval, self.on_timer) - self.connection.watch(HeartbeatWatch(self.on_heartbeat)) + self.connection.watch(AnyWatch(self.on_heartbeat)) - def on_heartbeat(self): + def on_heartbeat(self, frame): + print('Heart Beat!') + self.last_heartbeat_on = monotonic.monotonic() + + def on_any_frame(self): + """ + Hehehe, most AMQP servers are not AMQP-compliant. + Consider a situation where you just got like a metric shitton of messages, + and the TCP connection is bustin' filled with those frames. + + Server should still be able to send a heartbeat frame, but it doesn't, because of the queue, and + BANG, dead. + + I know I'm being picky, but at least I implement this behaviour correctly - see priority argument in send. + + Anyway, we should register an all-watch for this. + """ self.last_heartbeat_on = monotonic.monotonic() def on_timer(self): """Timer says we should send a heartbeat""" - self.connection.send([AMQPHeartbeatFrame()]) + self.connection.send([AMQPHeartbeatFrame()], priority=True) + print('Timer') if (monotonic.monotonic() - self.last_heartbeat_on) > 2*self.heartbeat_interval: # closing because of heartbeat + print('TERMINATING BECAUSE NO HEARTBEAT!!!!') self.connection.send(None) self.connection.watchdog(self.heartbeat_interval, self.on_timer) - diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index fa382731f440ada4bc29adc5ade195bf94090f9c..3177fc9b88e13a39905c6ac97651d73bd055fe20 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -4,6 +4,7 @@ import six import logging import select import monotonic +import collections import heapq from coolamqp.uplink.listener.socket import SocketFailed, BaseSocket @@ -12,6 +13,10 @@ from coolamqp.uplink.listener.socket import SocketFailed, BaseSocket logger = logging.getLogger(__name__) +RO = select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR +RW = RO | select.EPOLLOUT + + class EpollSocket(BaseSocket): """ EpollListener substitutes your BaseSockets with this @@ -19,16 +24,11 @@ class EpollSocket(BaseSocket): def __init__(self, sock, on_read, on_fail, listener): BaseSocket.__init__(self, sock, on_read=on_read, on_fail=on_fail) self.listener = listener + self.priority_queue = collections.deque() - def get_epoll_eventset(self): - if len(self.data_to_send) > 0: - return select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP | select.EPOLLOUT - else: - return select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP - - def send(self, data): - self.data_to_send.append(data) - self.listener.epoll.modify(self, self.get_epoll_eventset()) + def send(self, data, priority=False): + BaseSocket.send(self, data, priority=priority) + self.listener.epoll.modify(self, RW) def oneshot(self, seconds_after, callable): """ @@ -59,6 +59,13 @@ class EpollListener(object): def wait(self, timeout=1): events = self.epoll.poll(timeout=timeout) + + # Timer events + mono = monotonic.monotonic() + while len(self.time_events) > 0 and (self.time_events[0][0] < mono): + ts, fd, callback = heapq.heappop(self.time_events) + callback() + for fd, event in events: sock = self.fd_to_sock[fd] @@ -66,23 +73,21 @@ class EpollListener(object): try: if event & (select.EPOLLERR | select.EPOLLHUP): raise SocketFailed() - elif event & select.EPOLLIN: + + if event & select.EPOLLIN: sock.on_read() - elif event & select.EPOLLOUT: - sock.on_write() + + if event & select.EPOLLOUT: + if sock.on_write(): + # I'm done with sending for now + self.epoll.modify(sock.fileno(), RW) + except SocketFailed: self.epoll.unregister(fd) del self.fd_to_sock[fd] sock.on_fail() self.noshot(sock) sock.close() - else: - self.epoll.modify(fd, sock.get_epoll_eventset()) - - # Timer events - while len(self.time_events) > 0 and (self.time_events[0][0] < monotonic.monotonic()): - ts, fd, callback = heapq.heappop(self.time_events) - callback() def noshot(self, sock): """ @@ -133,6 +138,6 @@ class EpollListener(object): sock = EpollSocket(sock, on_read, on_fail, self) self.fd_to_sock[sock.fileno()] = sock - self.epoll.register(sock, sock.get_epoll_eventset()) + self.epoll.register(sock, RW) return sock diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 2f97b256f7b3d21569f9dfa4836f872726308f99..3f95d797f0cd99d81c4145726a1324e2b0e4f6e3 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -35,17 +35,23 @@ class BaseSocket(object): assert sock is not None self.sock = sock self.data_to_send = collections.deque() + self.priority_queue = collections.deque() # when a piece of data is finished, this queue is checked first self.my_on_read = on_read self.on_fail = on_fail self.on_time = on_time - def send(self, data): + def send(self, data, priority=True): """ - Schedule to send some data + Schedule to send some data. - :param data: data to send, or None to terminate this socket + :param data: data to send, or None to terminate this socket. + Note that data will be sent atomically, ie. without interruptions. + :param priority: preempt other datas. Property of sending data atomically will be maintained. """ - raise Exception('Abstract; listener should override that') + if priority: + self.priority_queue.append(data) + else: + self.data_to_send.append(data) def oneshot(self, seconds_after, callable): """ @@ -81,13 +87,18 @@ class BaseSocket(object): def on_write(self): """ Socket is writable, called by Listener - :return: (bool) I finished sending all the data for now :raises SocketFailed: on socket error + :return: True if I'm done sending shit for now """ - if len(self.data_to_send) == 0: - return True # No data to send - while len(self.data_to_send) > 0: + while True: + if len(self.data_to_send) == 0: + if len(self.priority_queue) == 0: + return True + else: + self.data_to_send.appendleft(self.priority_queue.popleft()) + + assert len(self.data_to_send) > 0 if self.data_to_send[0] is None: raise SocketFailed() # We should terminate the connection! @@ -100,11 +111,15 @@ class BaseSocket(object): if sent < len(self.data_to_send[0]): # Not everything could be sent self.data_to_send[0] = buffer(self.data_to_send[0], sent) - return False # I want to send more + return False else: - self.data_to_send.popleft() # Sent all! + # Looks like everything has been sent + self.data_to_send.popleft() # mark as sent - return True # all for now + if len(self.priority_queue) > 0: + # We can send a priority pack + print('Deploying priority data') + self.data_to_send.appendleft(self.priority_queue.popleft()) def fileno(self): """Return descriptor number""" diff --git a/tests/run.py b/tests/run.py index 522664a8e1fe9299acf43226a9d8679268f3fdc5..d1fca1139a083f15d894f0c7e62e6cd608f13c6b 100644 --- a/tests/run.py +++ b/tests/run.py @@ -11,7 +11,7 @@ from coolamqp.messages import Queue NODE = NodeDefinition('127.0.0.1', 5672, 'user', 'user', heartbeat=5) -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.DEBUG) if __name__ == '__main__': lt = ListenerThread() @@ -20,8 +20,7 @@ if __name__ == '__main__': con = Connection(NODE, lt) con.start() - - cons = Consumer(Queue('siema-eniu', auto_delete=True, exclusive=True)) + cons = Consumer(Queue('siema-eniu'), no_ack=False) cons.attach(con) while True: