From 01873aaa20ee75d0c1e537b78a2b938fe1d31439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Wed, 4 Jan 2017 10:49:43 +0100 Subject: [PATCH] Consumer can receive a message --- coolamqp/attaches/__init__.py | 10 + coolamqp/attaches/consumer.py | 323 ++++++++++++++++++ coolamqp/connection/consumer.py | 291 ---------------- .../framing/compilation/textcode_fields.py | 4 + coolamqp/framing/definitions.py | 15 +- coolamqp/uplink/__init__.py | 2 +- coolamqp/uplink/connection/__init__.py | 3 +- coolamqp/uplink/connection/connection.py | 50 ++- coolamqp/uplink/connection/watches.py | 17 +- coolamqp/uplink/handshake.py | 3 +- coolamqp/uplink/listener/epoll_listener.py | 2 - setup.cfg | 3 + tests/run.py | 11 +- 13 files changed, 424 insertions(+), 310 deletions(-) create mode 100644 coolamqp/attaches/__init__.py create mode 100644 coolamqp/attaches/consumer.py delete mode 100644 coolamqp/connection/consumer.py diff --git a/coolamqp/attaches/__init__.py b/coolamqp/attaches/__init__.py new file mode 100644 index 0000000..7d31da1 --- /dev/null +++ b/coolamqp/attaches/__init__.py @@ -0,0 +1,10 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +""" +Attaches are components that attach to an coolamqp.uplink.Connection and perform some duties +These duties almost require allocating a channel. The attache becomes then responsible for closing this channel. + +Attache should also register at least one on_fail watch, so it can handle things if they go south. +""" + +from coolamqp.attaches.consumer import Consumer \ No newline at end of file diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py new file mode 100644 index 0000000..119ff54 --- /dev/null +++ b/coolamqp/attaches/consumer.py @@ -0,0 +1,323 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import uuid +from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, AMQPHeaderFrame +from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, BasicConsume, \ + BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ + QueueBind, QueueBindOk, ChannelClose, ChannelCloseOk, BasicCancel, BasicDeliver, \ + BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk +from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch + + +ST_OFFLINE = 0 # Consumer is *not* consuming, no setup attempts are being made +ST_SYNCING = 1 # A process targeted at consuming has been started +ST_ONLINE = 2 # Consumer is declared all right + + +EV_ONLINE = 0 # called upon consumer being online and consuming +EV_CANCEL = 1 # consumer has been cancelled by BasicCancel. + # EV_OFFLINE will follow immediately +EV_OFFLINE = 2 # channel down +EV_MESSAGE = 3 # received a message + +class Consumer(object): + """ + This object represents a consumer in the system. + + Consumer may reside on any AMQP broker, this is to be decided by CoolAMQP. + Consumer, when created, has the state of ST_SYNCING. CoolAMQP will + try to declare the consumer where it makes most sense for it to be. + + If it succeeds, the consumer will enter state ST_ONLINE, and callables + on_start will be called. This means that broker has confirmed that this + consumer is operational and receiving messages. + + Note that does not attempt to cancel consumers, or any of such nonsense. Having + a channel per consumer gives you the unique possibility of simply closing the channel. + Since this implies cancelling the consumer, here you go. + """ + + def attach(self, connection): + """ + Attach this consumer to a connection + :param connection: coolamqp.framing.Connection + """ + self.connection = connection + connection.call_on_connected(self.on_uplink_established) + + def __init__(self, queue, no_ack=True, qos=None, dont_pause=False): + """ + To be instantiated only by Cluster + + :param state: state of the consumer + :param queue: Queue object, being consumed from right now. + Note that name of anonymous queue might change at any time! + :param no_ack: Will this consumer require acknowledges from messages? + :param dont_pause: Consumer will fail on the spot instead of pausing + """ + self.state = ST_OFFLINE + self.queue = queue + self.no_ack = no_ack + + # private + self.connection = None # broker on which was last seen + self.channel_id = None + self.cancelled = False # did the client want to STOP using this consumer? + self.receiver = None # MessageReceiver instance + + + def on_event(self, event, arg=None): + """ + Called upon events arriving. Possible events are: + arg={EV_ONLINE} called directly after setup. self.state is not yet set! + args={EV_CANCEL} seen a RabbitMQ consumer cancel notify. EV_OFFLINE follows + args={EV_OFFLINE} sent when a channel is no longer usable. It may not yet have been torn down. + arg={EV_MESSAGE} + """ + if event == EV_OFFLINE and (self.state is not ST_ONLINE): + return # No point in processing that + + if event == EV_ONLINE: + self.state = ST_ONLINE + assert self.receiver is None + self.receiver = MessageReceiver(self) + + elif event == EV_OFFLINE: + self.receiver = None + + def __stateto(self, st): + """if st is not current state, statify it. + As an extra if it's a transition to ST_OFFLINE, send an event""" + if (self.state != ST_OFFLINE) and (st == ST_OFFLINE): + self.on_event(ST_OFFLINE) + else: + self.state = st + + def on_close(self, payload=None): + """ + Handle closing the channel. It sounds like an exception... + + This is done in two steps: + 1. self.state <- ST_OFFLINE, on_event(EV_OFFLINE) upon detecting that no more messages will + be there + 2. self.channel_id <- None, channel is returned to Connection - channel has been physically torn down + """ + should_retry = False + release_channel = False + + if isinstance(payload, BasicCancel): + # Consumer Cancel Notification - by RabbitMQ + self.on_event(EV_CANCEL) + self.__stateto(ST_OFFLINE) + self.connection.send([AMQPMethodFrame(self.channel_id, BasicCancelOk()), + AMQPMethodFrame(self.channel_id, ChannelClose(0, b'Received basic.cancel', 0, 0))]) + return + + if isinstance(payload, BasicCancelOk): + self.__stateto(ST_OFFLINE) + # proceed with teardown + self.connection.send([AMQPMethodFrame(self.channel_id, ChannelClose(0, b'Received basic.cancel-ok', 0, 0))]) + return + + # at this point this can be only ChannelClose, ChannelCloseOk or on_fail + # release the kraken + + if isinstance(payload, ChannelClose): + # it sounds like an exception + self.__stateto(ST_OFFLINE) + print(payload.reply_code, payload.reply_text) + self.connection.send([AMQPMethodFrame(self.channel_id, ChannelCloseOk())]) + should_retry = payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED) + + if self.channel_id is not None: + self.__stateto(ST_OFFLINE) + self.connection.unwatch_all(self.channel_id) + self.connection.free_channels.append(self.channel_id) + self.channel_id = None + self.remaining_for_ack = set() + + if should_retry: + self.on_uplink_established() # retry + + def on_delivery(self, sth): + """ + Callback for delivery-related shit + :param sth: AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or AMQPBodyFrame + """ + if isinstance(sth, BasicDeliver): + self.receiver.on_basic_deliver(sth) + elif isinstance(sth, AMQPBodyFrame): + self.receiver.on_body(sth.data) + elif isinstance(sth, AMQPHeaderFrame): + self.receiver.on_head(sth) + + # No point in listening for more stuff, that's all the watches even listen for + + def on_setup(self, payload): + """Called with different kinds of frames - during setup""" + + if self.cancelled: + # We were declaring this, but during this situation this + # consumer was cancelled. Close the channel and things. + self.connection.send(self.channel_id, ChannelClose(0, 'Consumer cancelled', 0, 0)) + return + + if isinstance(payload, ChannelOpenOk): + # Do we need to declare the exchange? + + if self.queue.exchange is not None: + self.connection.method_and_watch( + self.channel_id, + ExchangeDeclare(self.queue.exchange.name.encode('utf8'), + self.queue.exchange.type.encode('utf8'), + False, + self.queue.exchange.durable, + self.queue.exchange.auto_delete, + False, + False, + []), + ExchangeDeclareOk, + self.on_setup + ) + else: + self.on_setup(ExchangeDeclareOk()) + + elif isinstance(payload, ExchangeDeclareOk): + # Declare the queue + + name = b'' if self.queue.anonymous else self.queue.name.encode('utf8') + + self.connection.method_and_watch( + self.channel_id, + QueueDeclare( + name, + False, + self.queue.durable, + self.queue.exclusive, + self.queue.auto_delete, + False, + [] + ), + QueueDeclareOk, + self.on_setup + ) + + elif isinstance(payload, QueueDeclareOk): + # did we need an anonymous name? + if self.queue.anonymous: + self.queue.name = payload.queue_name.decode('utf8') + + # We need any form of binding. + if self.queue.exchange is not None: + self.connection.method_and_watch( + self.channel_id, + QueueBind( + self.queue.name.encode('utf8'), + self.queue.exchange.name.encode('utf8'), + b'', + False, + [] + ), + QueueBindOk, + self.on_setup + ) + else: + # default exchange, pretend it was bind ok + self.on_setup(QueueBindOk()) + elif isinstance(payload, QueueBindOk): + # itadakimasu + self.connection.method_and_watch( + self.channel_id, + BasicConsume( + self.queue.name.encode('utf8'), + self.queue.name.encode('utf8'), + False, + self.no_ack, + self.queue.exclusive, + False, + [] + ), + BasicConsumeOk, + self.on_setup + ) + + elif isinstance(payload, BasicConsumeOk): + # AWWW RIGHT~!!! + self.state = ST_ONLINE + + # Register watches for receiving shit + self.connection.watch(HeaderOrBodyWatch(self.channel_id, self.on_delivery)) + mw = MethodWatch(self.channel_id, BasicDeliver, self.on_delivery) + mw.oneshot = False + self.connection.watch(mw) + + self.on_event(EV_ONLINE) + + def on_uplink_established(self): + """Consumer was created or uplink was regained. Try to declare it""" + if self.cancelled: + return # it's OK. + + self.state = ST_SYNCING + self.channel_id = self.connection.free_channels.pop() + + self.connection.watch_for_method(self.channel_id, + (ChannelClose, ChannelCloseOk, BasicCancel), + self.on_close, + on_fail=self.on_close) + + self.connection.method_and_watch( + self.channel_id, + ChannelOpen(), + ChannelOpenOk, + self.on_setup + ) + + + +class MessageReceiver(object): + """This is an object that is used to received messages. + + It maintains all the state, and is used to ack/nack messages as well. + + This object is TORN DOWN when a consumer goes offline, + and is recreated when it goes online. + + This is called by consumer upon receiving different parts of the message, + and may opt to kill the connection on bad framing with + self.consumer.connection.send(None) + """ + def __init__(self, consumer): + self.consumer = consumer + self.state = 0 # 0 - waiting for Basic-Deliver + # 1 - waiting for Header + # 2 - waiting for Body [all] + + self.bdeliver = None # payload of Basic-Deliver + self.header = None # AMQPHeaderFrame + self.body = [] # list of payloads + self.data_to_go = None # set on receiving header, how much bytes we need yet + + def on_head(self, frame): + assert self.state == 1 + self.header = frame + self.data_to_go = frame.body_size + self.state = 2 + + def on_basic_deliver(self, payload): + assert self.state == 0 + self.bdeliver = payload + self.state = 1 + + def on_body(self, payload): + assert self.state == 2 + self.body.append(payload) + self.data_to_go -= len(payload) + assert self.data_to_go >= 0 + if self.data_to_go == 0: + # Message A-OK! + print('Yo, got a message of %s' % (u''.join(map(str, self.body)))) + self.state = 0 + + # at this point it's safe to clear the body + self.body = [] diff --git a/coolamqp/connection/consumer.py b/coolamqp/connection/consumer.py deleted file mode 100644 index 4979a64..0000000 --- a/coolamqp/connection/consumer.py +++ /dev/null @@ -1,291 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import uuid -from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, AMQPHeaderFrame -from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, BasicConsume, \ - BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ - QueueBind, QueueBindOk, ChannelClose, ChannelCloseOk, BasicCancel, BasicDeliver, \ - BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED -from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch - - -ST_OFFLINE = 0 # Consumer is *not* consuming, no setup attempts are being made -ST_SYNCING = 1 # A process targeted at consuming has been started -ST_ONLINE = 2 # Consumer is declared all right - - -class Consumer(object): - """ - This object represents a consumer in the system. - - Consumer may reside on any AMQP broker, this is to be decided by CoolAMQP. - Consumer, when created, has the state of ST_SYNCING. CoolAMQP will - try to declare the consumer where it makes most sense for it to be. - - If it succeeds, the consumer will enter state ST_ONLINE, and callables - on_start will be called. This means that broker has confirmed that this - consumer is operational and receiving messages. - - If the consumer gets a message, it will relay it to a specified callable. - The message may need acking or rejecting. - - THIS OBJECT CAN OUTLIVE IT'S BROKER, AND THEREFORE .broker FIELD MUST BE SET - ON A NEW BROKER. HOWEVER, ALL WATCHES MUST BE CALLED BEFOREHAND. - - Note that does not attempt to cancel consumers, or any of such nonsense. Having - a channel per consumer gives you the unique possibility of simply closing the channel. - Since this implies cancelling the consumer, here you go. - """ - - def __init__(self, queue, no_ack=True, qos=None, dont_pause=False): - """ - To be instantiated only by Cluster - - :param state: state of the consumer - :param queue: Queue object, being consumed from right now. - Note that name of anonymous queue might change at any time! - :param no_ack: Will this consumer require acknowledges from messages? - :param dont_pause: Consumer will fail on the spot instead of pausing - """ - self.state = ST_OFFLINE - self.queue = queue - self.no_ack = no_ack - - # private - self.broker = None # broker on which was last seen - self.channel_id = None - - self.cancelled = False # did the client want to STOP using this consumer? - - # state machine for receiving messages - self.recv_state = 0 # 0 - waiting for basic.deliver - # 1 - waiting for header - # 2 - waiting for body - - self.delivery = None # tuple of (delivery tag, exchange, routing_key) - self.properties = None - self.content_parts = [] - self.length_remaining = 0 - - self.remaining_for_ack = set() # unacknowledged delivery tags - - def on_header_or_body_or_delivery(self, frame): - - if isinstance(frame, BasicDeliver) and self.state == 0: - self.delivery = frame.delivery_tag, frame.exchange, frame.routing_key - self.recv_state = 1 - - elif isinstance(frame, AMQPHeaderFrame) and self.state == 1: - self.properties = frame.properties - self.length_remaining = frame.body_size - self.recv_state = 2 - - elif isinstance(frame, AMQPBodyFrame) and self.state == 2: - - self.content_parts.append(frame.payload) - self.length_remaining -= len(frame.payload) - - if self.length_remaining == 0: - self.broker.on_new_message(self, self.delivery[0], - self.delivery[1], - self.delivery[2], - self.properties, - self.content_parts, - not self.no_ack - ) - if not self.no_ack: - self.remaining_for_ack.add(self.delivery[0]) - self.recv_state = 0 - else: - self.broker.connection.send(None, 'state assertion failed') - - def reject(self, consumer, delivery_tag): - - if self.cancelled: - return - - if self != consumer: - return # it was not me - - if delivery_tag not in self.remaining_for_ack: - return # not remaining - - self.broker.connection.send(AMQPMethodFrame( - self.channel_id, - BasicReject(delivery_tag, True) - )) - - self.remaining_for_ack.remove(delivery_tag) - - def acknowledge(self, consumer, delivery_tag): - - if self.cancelled: - return - - if self != consumer: - return # it was not me - - if delivery_tag not in self.remaining_for_ack: - return # not remaining - - self.broker.connection.send(AMQPMethodFrame( - self.channel_id, - BasicAck(delivery_tag, False) - )) - - self.remaining_for_ack.remove(delivery_tag) - - def cancel(self): - """Stop using this consumer""" - self.cancelled = True - - if self.state == ST_ONLINE: - # Consuming, close the channel please - self.broker.connection.send(AMQPMethodFrame(self.channel_id, - ChannelClose( - 0, 'Consumer cancelled', 0, 0 - ))) - - def on_close(self, payload=None): - """Handle closing the channel. It sounds like an exception...""" - - if self.channel_id is None: - return - - should_retry = False - - if isinstance(payload, ChannelClose): - # it sounds like an exception - self.broker.connection.send(AMQPMethodFrame(self.channel_id, - ChannelCloseOk())) - - should_retry = payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED) - - self.broker.connection.unwatch_all(self.channel_id) - self.broker.free_channels.append(self.channel_id) - self.channel_id = None - self.state = ST_OFFLINE - self.remaining_for_ack = set() - self.recv_state = 0 - - if should_retry: - # retry - self.on_uplink_established(self.broker) - - - def on_setup(self, payload): - """Called with different kinds of frames - during setup""" - - if self.cancelled: - # We were declaring this, but during this situation this - # consumer was cancelled. Close the channel and things. - self.broker.connection.send(self.channel_id, ChannelClose(0, 'Consumer cancelled', 0, 0)) - return - - if isinstance(payload, ChannelOpenOk): - # Do we need to declare the exchange? - - if self.queue.exchange is not None: - self.broker.connection.method_and_watch( - self.channel_id, - ExchangeDeclare(self.queue.exchange.name.encode('utf8'), - self.queue.exchange.type.encode('utf8'), - False, - self.queue.exchange.durable, - self.queue.exchange.auto_delete, - False, - False, - []), - ExchangeDeclareOk, - self.on_setup - ) - else: - self.on_setup(ExchangeDeclareOk()) - - elif isinstance(payload, ExchangeDeclareOk): - # Declare the queue - - name = b'' if self.queue.anonymous else self.queue.name.encode('utf8') - - self.broker.connection.method_and_watch( - self.channel_id, - QueueDeclare( - name, - False, - self.queue.durable, - self.queue.exclusive, - self.queue.auto_delete, - False, - [] - ), - QueueDeclareOk, - self.on_setup - ) - - elif isinstance(payload, QueueDeclareOk): - # did we need an anonymous name? - if self.queue.anonymous: - self.queue.name = payload.queue_name.decode('utf8') - - # We need any form of binding. - xchg_name = b'' if self.queue.exchange is None else self.queue.exchange.name.encode('utf8') - - self.broker.connection.method_and_watch( - self.channel_id, - QueueBind( - self.queue.name.encode('utf8'), - xchg_name, - b'', - False, - [] - ), - QueueBindOk, - self.on_setup - ) - elif isinstance(payload, QueueBindOk): - # itadakimasu - self.broker.connection.method_and_watch( - self.channel_id, - BasicConsume( - self.queue.name.encode('utf8'), - self.queue.name.encode('utf8'), - False, - self.no_ack, - self.queue.exclusive, - False, - [] - ), - BasicConsumeOk, - self.on_setup - ) - - elif isinstance(payload, BasicConsumeOk): - # AWWW RIGHT~!!! - self.state = ST_ONLINE - - self.broker.connection.watch(HeaderOrBodyWatch(self.channel_id, self.on_header_or_body_or_delivery)) - mw = MethodWatch(self.channel_id, BasicDeliver, self.on_header_or_body_or_delivery) - mw.oneshot = False - self.broker.connection.watch(mw) - - def on_uplink_established(self, broker): - """Consumer was created or uplink was regained. Try to declare it""" - if self.cancelled: - return # it's OK. - - self.broker = broker - - self.state = ST_SYNCING - self.channel_id = self.broker.free_channels.pop() - - self.broker.connection.watch_for_method(self.channel_id, - (ChannelClose, ChannelCloseOk, BasicCancel), - self.on_close, - on_fail=self.on_close) - - self.broker.connection.method_and_watch( - self.channel_id, - ChannelOpen(), - ChannelOpenOk, - self.on_setup - ) diff --git a/coolamqp/framing/compilation/textcode_fields.py b/coolamqp/framing/compilation/textcode_fields.py index fee06ff..e8fb3c8 100644 --- a/coolamqp/framing/compilation/textcode_fields.py +++ b/coolamqp/framing/compilation/textcode_fields.py @@ -215,6 +215,10 @@ def get_serializer(fields, prefix='', indent_level=2): emit_single_struct_pack() emit('buf.write(%s)', nam) elif field.basic_type == 'table': + if len(bits) > 0: + emit_bits() + if len(formats) > 0: + emit_single_struct_pack() emit('enframe_table(buf, %s)', nam) else: formats.append(BASIC_TYPES[field.basic_type][1]) diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 7104e89..2959949 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -408,8 +408,9 @@ class ConnectionStart(AMQPMethodPayload): self.locales = locales def write_arguments(self, buf): + buf.write(struct.pack('!BB', self.version_major, self.version_minor)) enframe_table(buf, self.server_properties) - buf.write(struct.pack('!BBI', self.version_major, self.version_minor, len(self.mechanisms))) + buf.write(struct.pack('!I', len(self.mechanisms))) buf.write(self.mechanisms) buf.write(struct.pack('!I', len(self.locales))) buf.write(self.locales) @@ -1121,8 +1122,8 @@ class ExchangeBind(AMQPMethodPayload): buf.write(self.source) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) - enframe_table(buf, self.arguments) buf.write(struct.pack('!B', (self.no_wait << 0))) + enframe_table(buf, self.arguments) def get_size(self): return 6 + len(self.destination) + len(self.source) + len(self.routing_key) + frame_table_size(self.arguments) @@ -1267,8 +1268,8 @@ class ExchangeDeclare(AMQPMethodPayload): buf.write(self.exchange) buf.write(struct.pack('!B', len(self.type_))) buf.write(self.type_) - enframe_table(buf, self.arguments) buf.write(struct.pack('!B', (self.passive << 0) | (self.durable << 1) | (self.auto_delete << 2) | (self.internal << 3) | (self.no_wait << 4))) + enframe_table(buf, self.arguments) def get_size(self): return 5 + len(self.exchange) + len(self.type_) + frame_table_size(self.arguments) @@ -1476,8 +1477,8 @@ class ExchangeUnbind(AMQPMethodPayload): buf.write(self.source) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) - enframe_table(buf, self.arguments) buf.write(struct.pack('!B', (self.no_wait << 0))) + enframe_table(buf, self.arguments) def get_size(self): return 6 + len(self.destination) + len(self.source) + len(self.routing_key) + frame_table_size(self.arguments) @@ -1613,8 +1614,8 @@ class QueueBind(AMQPMethodPayload): buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) - enframe_table(buf, self.arguments) buf.write(struct.pack('!B', (self.no_wait << 0))) + enframe_table(buf, self.arguments) def get_size(self): return 6 + len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size(self.arguments) @@ -1753,8 +1754,8 @@ class QueueDeclare(AMQPMethodPayload): buf.write(b'\x00\x00') buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) - enframe_table(buf, self.arguments) buf.write(struct.pack('!B', (self.passive << 0) | (self.durable << 1) | (self.exclusive << 2) | (self.auto_delete << 3) | (self.no_wait << 4))) + enframe_table(buf, self.arguments) def get_size(self): return 4 + len(self.queue) + frame_table_size(self.arguments) @@ -2424,8 +2425,8 @@ class BasicConsume(AMQPMethodPayload): buf.write(self.queue) buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) - enframe_table(buf, self.arguments) buf.write(struct.pack('!B', (self.no_local << 0) | (self.no_ack << 1) | (self.exclusive << 2) | (self.no_wait << 3))) + enframe_table(buf, self.arguments) def get_size(self): return 5 + len(self.queue) + len(self.consumer_tag) + frame_table_size(self.arguments) diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index fb9906e..93359f8 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -11,5 +11,5 @@ Watches will fire upon an event triggering them. """ from __future__ import absolute_import, division, print_function -from coolamqp.uplink.connection import Connection +from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, MethodWatch from coolamqp.uplink.listener import ListenerThread diff --git a/coolamqp/uplink/connection/__init__.py b/coolamqp/uplink/connection/__init__.py index f5fd1b4..f32f3c9 100644 --- a/coolamqp/uplink/connection/__init__.py +++ b/coolamqp/uplink/connection/__init__.py @@ -12,4 +12,5 @@ Connection is something that can: from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection.connection import Connection -from coolamqp.uplink.connection.watches import FailWatch, Watch +from coolamqp.uplink.connection.watches import FailWatch, Watch, HeaderOrBodyWatch, MethodWatch +from coolamqp.uplink.connection.states import ST_OFFLINE, ST_CONNECTING, ST_ONLINE diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 3130272..53aae6a 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -51,6 +51,8 @@ class Connection(object): self.state = ST_CONNECTING + self.callables_on_connected = [] # list of callable/0 + # Negotiated connection parameters - handshake will fill this in self.free_channels = [] # attaches can use this for shit. # WARNING: thread safety of this hinges on atomicity of .pop or .append @@ -58,10 +60,26 @@ class Connection(object): self.heartbeat = None self.extensions = [] + def call_on_connected(self, callable): + """ + Register a callable to be called when this links to the server. + + If you call it while the connection IS up, callable will be called even before this returns. + + :param callable: callable/0 to call + """ + if self.state == ST_ONLINE: + callable() + else: + self.callables_on_connected.append(callable) + def on_connected(self): """Called by handshaker upon reception of final connection.open-ok""" self.state = ST_ONLINE + while len(self.callables_on_connected) > 0: + self.callables_on_connected.pop()() + def start(self): """ Start processing events for this connect. Create the socket, @@ -95,8 +113,10 @@ class Connection(object): """ self.state = ST_OFFLINE # Update state - for channel, watches in six.iteritems(self.watches): # Run all watches - failed - for watch in watches: + watchlists = [self.watches[channel] for channel in self.watches] + + for watchlist in watchlists: # Run all watches - failed + for watch in watchlist: watch.failed() self.watches = {} # Clear the watch list @@ -110,6 +130,7 @@ class Connection(object): self.on_fail() # it does not make sense to prolong the agony if isinstance(payload, ConnectionClose): + print(payload.reply_code, payload.reply_text) self.send([AMQPMethodFrame(0, ConnectionCloseOk())]) elif isinstance(payload, ConnectionCloseOk): self.send(None) @@ -120,6 +141,11 @@ class Connection(object): :param reason: optional human-readable reason for this action """ if frames is not None: + for frame in frames: + if isinstance(frame, AMQPMethodFrame): + print('Sending ', frame.payload) + else: + print('Sending ', frame) self.sendf.send(frames) else: # Listener socket will kill us when time is right @@ -175,6 +201,12 @@ class Connection(object): """ self.listener_socket.oneshot(delay, callback) + def unwatch_all(self, channel_id): + """ + Remove all watches from specified channel + """ + self.watches.pop(channel_id, None) + def watch(self, watch): """ Register a watch. @@ -185,12 +217,22 @@ class Connection(object): else: self.watches[watch.channel].append(watch) - def watch_for_method(self, channel, method, callback): + def watch_for_method(self, channel, method, callback, on_fail=None): """ :param channel: channel to monitor :param method: AMQPMethodPayload class or tuple of AMQPMethodPayload classes :param callback: callable(AMQPMethodPayload instance) """ - mw = MethodWatch(channel, method, callback) + mw = MethodWatch(channel, method, callback, on_end=on_fail) self.watch(mw) return mw + + def method_and_watch(self, channel_id, method_payload, method_or_methods, callback): + """ + A syntactic sugar for + + .watch_for_method(channel_id, method_or_methdods, callback) + .send([AMQPMethodFrame(channel_id, method_payload)]) + """ + self.watch_for_method(channel_id, method_or_methods, callback) + self.send([AMQPMethodFrame(channel_id, method_payload)]) diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index 536147e..4f73cfb 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -1,7 +1,7 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeartbeatFrame +from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeartbeatFrame, AMQPHeaderFrame, AMQPBodyFrame class Watch(object): @@ -71,6 +71,21 @@ class HeartbeatWatch(Watch): return False +class HeaderOrBodyWatch(Watch): + """ + A multi-shot watch listening for AMQP header or body frames + """ + def __init__(self, channel, callable): + Watch.__init__(self, channel, False) + self.callable = callable + + def is_triggered_by(self, frame): + if not (isinstance(frame, (AMQPHeaderFrame, AMQPBodyFrame))): + return False + self.callable(frame) + return True + + class MethodWatch(Watch): """ One-shot watch listening for methods. diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 4b5b25a..87cd6df 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -10,7 +10,8 @@ from coolamqp.framing.frames import AMQPMethodFrame from coolamqp.uplink.connection.states import ST_ONLINE SUPPORTED_EXTENSIONS = [ - b'publisher_confirms' + b'publisher_confirms', + b'consumer_cancel_notify' ] CLIENT_DATA = [ diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 1c4e800..fa38273 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -118,8 +118,6 @@ class EpollListener(object): sock.fileno(), callback )) - else: - print('oneshot from nowhere') def register(self, sock, on_read=lambda data: None, on_fail=lambda: None): diff --git a/setup.cfg b/setup.cfg index 81af2bb..cdd41f0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,3 +3,6 @@ description-file = README.md [pycodestyle] max-line-length=120 + +[bdist_wheel] +universal=1 diff --git a/tests/run.py b/tests/run.py index 7f49260..522664a 100644 --- a/tests/run.py +++ b/tests/run.py @@ -6,6 +6,9 @@ from coolamqp.connection import NodeDefinition from coolamqp.uplink import Connection import logging +from coolamqp.attaches import Consumer +from coolamqp.messages import Queue + NODE = NodeDefinition('127.0.0.1', 5672, 'user', 'user', heartbeat=5) logging.basicConfig(level=logging.INFO) @@ -15,9 +18,13 @@ if __name__ == '__main__': lt.start() con = Connection(NODE, lt) - con.start() - time.sleep(50) + + cons = Consumer(Queue('siema-eniu', auto_delete=True, exclusive=True)) + cons.attach(con) + + while True: + time.sleep(10) lt.terminate() -- GitLab