From 300ac1180e5009e47e3e52b8a117b37f5a6fe3e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sun, 1 Jan 2017 21:32:28 +0100 Subject: [PATCH] heartbeat + session transcripting --- coolamqp/backends/pyamqp.py | 169 ------------------ coolamqp/connection/__init__.py | 5 + coolamqp/connection/definition.py | 10 +- coolamqp/connection/orders.py | 39 ++++ coolamqp/connection/state.py | 78 ++++++++ coolamqp/framing/base.py | 3 + .../compilation/compile_definitions.py | 2 +- coolamqp/framing/definitions.py | 36 ++-- coolamqp/scaffold.py | 43 ----- coolamqp/uplink/__init__.py | 12 +- coolamqp/uplink/connection/__init__.py | 1 + coolamqp/uplink/connection/connection.py | 59 ++++-- coolamqp/uplink/connection/watches.py | 96 ++++++++++ coolamqp/uplink/handshake.py | 3 +- coolamqp/uplink/heartbeat.py | 4 +- tests/run.py | 9 +- .../test_connection}/__init__.py | 0 tests/test_connection/test_state.py | 36 ++++ 18 files changed, 345 insertions(+), 260 deletions(-) delete mode 100644 coolamqp/backends/pyamqp.py create mode 100644 coolamqp/connection/orders.py create mode 100644 coolamqp/connection/state.py delete mode 100644 coolamqp/scaffold.py create mode 100644 coolamqp/uplink/connection/watches.py rename {coolamqp/cluster => tests/test_connection}/__init__.py (100%) create mode 100644 tests/test_connection/test_state.py diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py deleted file mode 100644 index bebc6de..0000000 --- a/coolamqp/backends/pyamqp.py +++ /dev/null @@ -1,169 +0,0 @@ -# coding=UTF-8 -"""Backend using pyamqp""" -from __future__ import division -import amqp -import socket -import six -import functools -import logging -from coolamqp.backends.base import AMQPBackend, RemoteAMQPError, ConnectionFailedError -import monotonic - - -logger = logging.getLogger(__name__) - - -def translate_exceptions(fun): - """ - Translates pyamqp's exceptions to CoolAMQP's - - py-amqp's exceptions are less than intuitive, so expect many special cases - """ - @functools.wraps(fun) - def q(*args, **kwargs): - try: - return fun(*args, **kwargs) - except (amqp.exceptions.ConsumerCancelled): - # I did not expect those here. Channel must be really bad. - raise ConnectionFailedError('WTF: '+(e.message if six.PY2 else e.args[0])) - except (amqp.exceptions.Blocked, ) as e: - pass # too bad - except (amqp.RecoverableChannelError, - amqp.exceptions.NotFound, - amqp.exceptions.AccessRefused) as e: - - - raise RemoteAMQPError(e.reply_code, e.reply_text) - except (IOError, - amqp.ConnectionForced, - amqp.IrrecoverableChannelError, - amqp.exceptions.UnexpectedFrame) as e: - raise ConnectionFailedError(e.message if six.PY2 else e.args[0]) - return q - - -class PyAMQPBackend(AMQPBackend): - @translate_exceptions - def __init__(self, node, cluster_handler_thread): - AMQPBackend.__init__(self, node, cluster_handler_thread) - - self.connection = amqp.Connection(host=node.host, - userid=node.user, - password=node.password, - virtual_host=node.virtual_host, - heartbeat=node.heartbeat or 0) - try: - self.connection.connect() #todo what does this raise? - except AttributeError: - pass # this does not always have to exist - self.channel = self.connection.channel() - self.channel.auto_decode = False - self.heartbeat = node.heartbeat or 0 - self.last_heartbeat_at = monotonic.monotonic() - - def shutdown(self): - AMQPBackend.shutdown(self) - print 'BACKEND SHUTDOWN START' - try: - self.channel.close() - except: - pass - try: - self.connection.close() - except: - pass - print 'BACKEND SHUTDOWN COMPLETE' - - @translate_exceptions - def process(self, max_time=1): - try: - if self.heartbeat > 0: - if monotonic.monotonic() - self.last_heartbeat_at > (self.heartbeat / 2): - self.connection.heartbeat_tick(rate=self.heartbeat) - self.last_heartbeat_at = monotonic.monotonic() - self.connection.drain_events(max_time) - except socket.timeout as e: - pass - - @translate_exceptions - def basic_cancel(self, consumer_tag): - self.channel.basic_cancel(consumer_tag) - - @translate_exceptions - def basic_publish(self, message, exchange, routing_key): - # convert this to pyamqp's Message - a = amqp.Message(six.binary_type(message.body), - **message.properties) - - self.channel.basic_publish(a, exchange=exchange.name, routing_key=routing_key) - - @translate_exceptions - def exchange_declare(self, exchange): - self.channel.exchange_declare(exchange.name, exchange.type, durable=exchange.durable, - auto_delete=exchange.auto_delete) - - @translate_exceptions - def queue_bind(self, queue, exchange, routing_key=''): - self.channel.queue_bind(queue.name, exchange.name, routing_key) - - @translate_exceptions - def basic_ack(self, delivery_tag): - self.channel.basic_ack(delivery_tag, multiple=False) - - @translate_exceptions - def exchange_delete(self, exchange): - self.channel.exchange_delete(exchange.name) - - @translate_exceptions - def basic_qos(self, prefetch_size, prefetch_count, global_): - self.channel.basic_qos(prefetch_size, prefetch_count, global_) - - @translate_exceptions - def queue_delete(self, queue): - self.channel.queue_delete(queue.name) - - @translate_exceptions - def basic_reject(self, delivery_tag): - self.channel.basic_reject(delivery_tag, True) - - @translate_exceptions - def queue_declare(self, queue): - """ - Declare a queue. - - This will change queue's name if anonymous - :param queue: Queue - """ - if queue.anonymous: - queue.name = '' - - qname, mc, cc = self.channel.queue_declare(queue.name, - durable=queue.durable, - exclusive=queue.exclusive, - auto_delete=queue.auto_delete) - if queue.anonymous: - queue.name = qname - - @translate_exceptions - def basic_consume(self, queue, no_ack=False): - """ - Start consuming from a queue - :param queue: Queue object - """ - self.channel.basic_consume(queue.name, - consumer_tag=queue.consumer_tag, - exclusive=queue.exclusive, - no_ack=no_ack, - callback=self.__on_message, - on_cancel=self.__on_consumercancelled) - - def __on_consumercancelled(self, consumer_tag): - self.cluster_handler_thread._on_consumercancelled(consumer_tag) - - def __on_message(self, message): - assert isinstance(message.body, six.binary_type) - self.cluster_handler_thread._on_recvmessage(message.body, - message.delivery_info['exchange'], - message.delivery_info['routing_key'], - message.delivery_info['delivery_tag'], - message.properties) diff --git a/coolamqp/connection/__init__.py b/coolamqp/connection/__init__.py index 9f2b35b..569c71d 100644 --- a/coolamqp/connection/__init__.py +++ b/coolamqp/connection/__init__.py @@ -1,2 +1,7 @@ # coding=UTF-8 +""" +This does things relating to a single connection +""" from __future__ import absolute_import, division, print_function + +from coolamqp.connection.definition import NodeDefinition diff --git a/coolamqp/connection/definition.py b/coolamqp/connection/definition.py index 82778d3..58682be 100644 --- a/coolamqp/connection/definition.py +++ b/coolamqp/connection/definition.py @@ -8,8 +8,9 @@ class NodeDefinition(object): Definition of a node """ - def __init__(self, host, port, user, password, virtual_host='/', amqp_version='0.9.1', heartbeat=None): + def __init__(self, host, port, user, password, virtual_host='/', heartbeat=None): """ + All necessary information to establish a link to a broker. :param host: TCP host, str :param port: TCP port, int @@ -23,5 +24,10 @@ class NodeDefinition(object): self.host = host self.port = port self.virtual_host = virtual_host - self.amqp_version = amqp_version self.heartbeat = heartbeat + + def to_connection_object(self): + """ + Return a Connection object that + :return: + """ diff --git a/coolamqp/connection/orders.py b/coolamqp/connection/orders.py new file mode 100644 index 0000000..9c43262 --- /dev/null +++ b/coolamqp/connection/orders.py @@ -0,0 +1,39 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +""" +You can feed Broker with some orders. +They work pretty much like futures. +""" +import threading + + +class BaseOrder(object): + def __init__(self, on_completed=None, on_failed=None): + self.lock = threading.Lock() + self.lock.acquire() + self.on_completed = on_completed + self.on_failed = on_failed + + def on_done(self): + if self.on_completed is not None: + self.on_completed() + self.lock.release() + + def on_fail(self): + if self.on_failed is not None: + self.on_failed() + self.lock.release() + + def wait(self): + self.lock.acquire() + + +class LinkSetup(BaseOrder): + """ + Connecting to broker + """ + +class ConsumeOnChannel(BaseOrder): + """ + + """ \ No newline at end of file diff --git a/coolamqp/connection/state.py b/coolamqp/connection/state.py new file mode 100644 index 0000000..b1c75e4 --- /dev/null +++ b/coolamqp/connection/state.py @@ -0,0 +1,78 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import six +from coolamqp.uplink import Handshaker +from coolamqp.connection.orders import LinkSetup +from coolamqp.framing.definitions import ChannelOpenOk, ChannelOpen +from coolamqp.framing.frames import AMQPMethodFrame + + +class Broker(object): + """ + A connection to a single broker + """ + + def __init__(self, connection, node_definition): + """ + :param connection: coolamqp.uplink.Connection, before .start is called + :param node_definition: node definition that will be used to handshake + """ + self.connection = connection + self.node_definition = node_definition + + self.free_channels = [] # list of channels usable for consuming shit + + + def connect(self): + """Return an LinkSetup order to get when it connects""" + ls = LinkSetup() + + def send_channel_opened(): + # OK, opened + ls.on_done() + + def handshake_complete(): + if handshaker.channel_max < 1: + self.connection.send(None, 'channel_max < 1 !!!') + ls.on_fail() + return + + for free_chan in six.moves.xrange(2, handshaker.channel_max+1): + self.free_channels.append(free_chan) + + # It's OK, open channel 1 for sending messages + self.connection.watch_for_method(1, ChannelOpenOk, send_channel_opened) + self.connection.send([AMQPMethodFrame(1, ChannelOpen())]) + + handshaker = Handshaker( + self.connection, + self.node_definition.user, + self.node_definition.password, + self.node_definition.virtual_host, + handshake_complete, + ls.on_failed, + heartbeat=self.node_definition.heartbeat or 0 + ) + + def send_channel_opened(frame): + # OK, opened + ls.on_done() + + def handshake_complete(): + if handshaker.channel_max < 1: + self.connection.send(None, 'channel_max < 1 !!!') + ls.on_fail() + return + + for free_chan in six.moves.xrange(2, handshaker.channel_max+1): + self.free_channels.append(free_chan) + + # It's OK, open channel 1 for sending messages + self.connection.watch_for_method(1, ChannelOpenOk, send_channel_opened) + self.connection.send([AMQPMethodFrame(1, ChannelOpen())]) + + self.connection.start() + + return ls + + diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index 08d7aae..0bc471b 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -151,6 +151,9 @@ class AMQPMethodPayload(AMQPPayload): :return: int, size of argument section """ + if self.IS_CONTENT_STATIC: + return len(self.STATIC_CONTENT)-4-4-1 # minus length, class, method, frame_end + raise NotImplementedError() def write_arguments(self, buf): diff --git a/coolamqp/framing/compilation/compile_definitions.py b/coolamqp/framing/compilation/compile_definitions.py index 41025f9..ff86fb4 100644 --- a/coolamqp/framing/compilation/compile_definitions.py +++ b/coolamqp/framing/compilation/compile_definitions.py @@ -344,7 +344,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved line(''' STATIC_CONTENT = %s # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END ''', - to_code_binary(struct.pack('!LBB', static_size + 4, cls.index, method.index) + \ + to_code_binary(struct.pack('!LHH', static_size + 4, cls.index, method.index) + \ method.get_static_body() + \ struct.pack('!B', FRAME_END))) diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index d45411e..d0b25b0 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -191,7 +191,7 @@ class ConnectionCloseOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x0A\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -279,7 +279,7 @@ class ConnectionOpenOk(AMQPMethodPayload): IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x0A\x29\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x29\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details FIELDS = [ @@ -788,7 +788,7 @@ class ChannelCloseOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x14\x29\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x14\x00\x29\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -915,7 +915,7 @@ class ChannelOpen(AMQPMethodPayload): IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x05\x14\x0A\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x05\x00\x14\x00\x0A\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details FIELDS = [ @@ -952,7 +952,7 @@ class ChannelOpenOk(AMQPMethodPayload): IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x05\x14\x0B\x00\x00\x00\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x05\x00\x14\x00\x0B\x00\x00\x00\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details FIELDS = [ @@ -1168,7 +1168,7 @@ class ExchangeDeclareOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x28\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -1197,7 +1197,7 @@ class ExchangeDeleteOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x28\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -1334,7 +1334,7 @@ class QueueBindOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x32\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x32\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -1826,7 +1826,7 @@ class QueueUnbindOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x32\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x32\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -2503,7 +2503,7 @@ class BasicGetEmpty(AMQPMethodPayload): IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x0D\x3C\x48\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x0D\x00\x3C\x00\x48\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details FIELDS = [ @@ -2700,7 +2700,7 @@ class BasicQosOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x3C\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x3C\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -2957,7 +2957,7 @@ class BasicRecoverOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x3C\x6F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x3C\x00\x6F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3004,7 +3004,7 @@ class TxCommit(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x14\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x14\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3034,7 +3034,7 @@ class TxCommitOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3066,7 +3066,7 @@ class TxRollback(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x1E\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x1E\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3096,7 +3096,7 @@ class TxRollbackOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x1F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x1F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3126,7 +3126,7 @@ class TxSelect(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x0A\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x0A\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3156,7 +3156,7 @@ class TxSelectOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ diff --git a/coolamqp/scaffold.py b/coolamqp/scaffold.py deleted file mode 100644 index 7c3d245..0000000 --- a/coolamqp/scaffold.py +++ /dev/null @@ -1,43 +0,0 @@ -# coding=UTF-8 -""" -Utilities for building the CoolAMQP engine -""" -from __future__ import absolute_import, division, print_function -import functools -import threading - - -class Synchronized(object): - """Protects access to methods with a lock""" - def __init__(self): - self.lock = threading.Lock() - - def protect(self): - """Make the function body lock-protected""" - def outer(fun): - @functools.wraps(fun) - def inner(*args, **kwargs): - with self.lock: - return fun(*args, **kwargs) - return inner - return outer - - -class AcceptsFrames(object): - """Base class for objects that accept AMQP frames""" - - def on_frame(self, amqp_frame): - """ - :type amqp_frame: AMQPFrame object - """ - - -class EmitsFrames(object): - """Base class for objects that send AMQP frames somewhere""" - - def wire_frame_to(self, acceptor): - """ - Configure this object to send frames somewhere. - - :param acceptor: an AcceptsFrames instance or callable(AMQPMethod instance) - """ \ No newline at end of file diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index f124517..11baba9 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -1,11 +1,17 @@ # coding=UTF-8 """ -The layer that allows you to attach Reactors, -ie. objects that are informed upon receiving a frame or connection dying. -They can also send frames themselves. + +Core object here is Connection. This package: + - establishes basic connectivity (up to the point where you can open channels yourself) + - takes care of heartbeats + +You can wait for a particular frame by setting watches on connections. +Watches will fire upon an event triggering them. + """ from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection import Connection from coolamqp.uplink.handshake import Handshaker from coolamqp.uplink.listener import ListenerThread +from coolamqp.uplink.connection import FailWatch, Watch diff --git a/coolamqp/uplink/connection/__init__.py b/coolamqp/uplink/connection/__init__.py index beed0f9..148c579 100644 --- a/coolamqp/uplink/connection/__init__.py +++ b/coolamqp/uplink/connection/__init__.py @@ -9,3 +9,4 @@ Connection is something that can: from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection.connection import Connection +from coolamqp.uplink.connection.watches import FailWatch, Watch diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 20c2aa2..fa37514 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -8,6 +8,7 @@ from coolamqp.uplink.connection.recv_framer import ReceivingFramer from coolamqp.uplink.connection.send_framer import SendingFramer from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeartbeatFrame +from coolamqp.uplink.connection.watches import MethodWatch, FailWatch logger = logging.getLogger(__name__) @@ -26,11 +27,8 @@ class Connection(object): self.failed = False self.transcript = None - self.method_watches = {} # channel => [AMQPMethodPayload instance, callback] - - # to call if an unwatched frame is caught - self.on_heartbeat = lambda: None - self.unwatched_frame = lambda frame: None # callable(AMQPFrame instance) + self.watches = {} # channel => [Watch object] + self.fail_watches = [] def start(self): """ @@ -43,8 +41,21 @@ class Connection(object): self.sendf = SendingFramer(self.listener_socket.send) def on_fail(self): + """Underlying connection is closed""" if self.transcript is not None: self.transcript.on_fail() + + for channel, watches in self.watches: + for watch in watches: + watch.failed() + + self.watches = {} + + for watch in self.fail_watches: + watch.fire() + + self.fail_watches = [] + self.failed = True def send(self, frames, reason=None): @@ -69,18 +80,19 @@ class Connection(object): if self.transcript is not None: self.transcript.on_frame(frame) - if isinstance(frame, AMQPMethodFrame): - if frame.channel in self.method_watches: - if isinstance(frame.payload, self.method_watches[frame.channel][0]): - method, callback = self.method_watches[frame.channel].popleft() - callback(frame.payload) - return + if frame.channel in self.watches: + deq = self.watches[frame.channel] - if isinstance(frame, AMQPHeartbeatFrame): - self.on_heartbeat() - return + examined_watches = [] + while len(deq) > 0: + watch = deq.popleft() + if not watch.is_triggered_by(frame) or (not watch.oneshot): + examined_watches.append(watch) - self.unwatched_frame(frame) + for watch in reversed(examined_watches): + deq.appendleft(watch) + + logger.critical('Unhandled frame %s, dropping', frame) def watch_watchdog(self, delay, callback): """ @@ -88,12 +100,23 @@ class Connection(object): """ self.listener_socket.oneshot(delay, callback) + def watch(self, watch): + """ + Register a watch. + :param watch: Watch to register + """ + if isinstance(watch, FailWatch): + self.fail_watches.append(watch) + else: + if watch.channel not in self.watches: + self.watches[watch.channel] = collections.deque([watch]) + else: + self.watches[watch.channel].append(watch) + def watch_for_method(self, channel, method, callback): """ :param channel: channel to monitor :param method: AMQPMethodPayload class :param callback: callable(AMQPMethodPayload instance) """ - if channel not in self.method_watches: - self.method_watches[channel] = collections.deque() - self.method_watches[channel].append((method, callback)) \ No newline at end of file + self.watch(MethodWatch(channel, method, callback)) diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py new file mode 100644 index 0000000..eaf81e9 --- /dev/null +++ b/coolamqp/uplink/connection/watches.py @@ -0,0 +1,96 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + +from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeartbeatFrame +from coolamqp.framing.base import AMQPMethodPayload + + +class Watch(object): + """ + A watch is placed per-channel, to listen for a particular frame. + """ + + def __init__(self, channel, oneshot): + """ + :param channel: Channel to listen to + :param oneshot: Is destroyed after triggering? + """ + self.channel = channel + self.oneshot = oneshot + + def is_triggered_by(self, frame): + """ + Does frame trigger this watch? + Run callable if it does. + :param frame: AMQPFrame instance + :return: bool + """ + raise Exception('Abstract method') + + def failed(self): + """ + This watch will process things no more, because underlying + link has failed + """ + +class FailWatch(Watch): + """ + A special kind of watch that fires when connection has died + """ + def __init__(self, callable): + Watch.__init__(self, None, True) + self.callable = callable + + def fire(self): + """ + Connection failed! + """ + self.callable() + + +class HeartbeatWatch(Watch): + """ + Registered if heartbeats are enabled + """ + def __init__(self, callable): + Watch.__init__(self, 0, False) + self.callable = callable + + def is_triggered_by(self, frame): + if isinstance(frame, AMQPHeartbeatFrame): + self.callable() + return True + return False + + +class MethodWatch(Watch): + """ + One-shot watch listening for methods. + """ + def __init__(self, channel, method_or_methods, callable, on_end=None): + """ + :param method_or_methods: class, or list of AMQPMethodPayload classes + :param callable: callable(AMQPMethodPayload instance) + :param on_end: callable/0 on link dying + """ + Watch.__init__(self, channel, True) + self.callable = callable + if issubclass(method_or_methods, AMQPMethodPayload): + self.methods = (method_or_methods, ) + else: + self.methods = tuple(method_or_methods) + self.on_end = on_end + + def failed(self): + if self.on_end is not None: + self.on_end() + + def is_triggered_by(self, frame): + + if not isinstance(frame, AMQPMethodFrame): + return False + + if isinstance(frame.payload, self.methods): + self.callable(frame.payload) + return True + return False diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index ce097cf..48e50c1 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -26,7 +26,6 @@ class Handshaker(object): Object that given a connection rolls the handshake. """ - def __init__(self, connection, login, password, virtual_host, on_success, on_fail, heartbeat=0): """ @@ -83,7 +82,7 @@ class Handshaker(object): def on_connection_tune(self, payload): print('Channel max: ', payload.channel_max, 'Frame max: ', payload.frame_max, 'Heartbeat: ', payload.heartbeat) - self.channel_max = payload.channel_max + self.channel_max = 65535 if payload.channel_max == 0 else payload.channel_max self.frame_max = payload.frame_max self.heartbeat = min(payload.heartbeat, self.heartbeat) diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py index ddf7f43..1804c52 100644 --- a/coolamqp/uplink/heartbeat.py +++ b/coolamqp/uplink/heartbeat.py @@ -3,7 +3,7 @@ from __future__ import absolute_import, division, print_function import monotonic from coolamqp.framing.frames import AMQPHeartbeatFrame - +from coolamqp.uplink.connection.watches import HeartbeatWatch class Heartbeater(object): """ @@ -17,7 +17,7 @@ class Heartbeater(object): self.last_heartbeat_on = monotonic.monotonic() # last heartbeat from server self.connection.watch_watchdog(self.heartbeat_interval, self.on_timer) - self.connection.on_heartbeat = self.on_heartbeat + self.connection.watch(HeartbeatWatch(self.on_heartbeat)) def on_heartbeat(self): self.last_heartbeat_on = monotonic.monotonic() diff --git a/tests/run.py b/tests/run.py index 7493382..262ea2f 100644 --- a/tests/run.py +++ b/tests/run.py @@ -3,6 +3,8 @@ from __future__ import absolute_import, division, print_function from coolamqp.uplink import ListenerThread, Connection import socket import time +from coolamqp.connection.state import Broker +from coolamqp.connection import NodeDefinition from coolamqp.uplink.transcript import SessionTranscript @@ -16,6 +18,9 @@ def newc(): from coolamqp.uplink import Handshaker +NODE = NodeDefinition('127.0.0.1', 5672, 'user', 'user', heartbeat=5) + + if __name__ == '__main__': lt = ListenerThread() @@ -23,9 +28,9 @@ if __name__ == '__main__': con = Connection(newc(), lt) con.transcript = SessionTranscript() + broker = Broker(con, NODE) - handshaker = Handshaker(con, 'user', 'user', '/', lambda: None, lambda: None, heartbeat=10) - con.start() + broker.connect() time.sleep(50) diff --git a/coolamqp/cluster/__init__.py b/tests/test_connection/__init__.py similarity index 100% rename from coolamqp/cluster/__init__.py rename to tests/test_connection/__init__.py diff --git a/tests/test_connection/test_state.py b/tests/test_connection/test_state.py new file mode 100644 index 0000000..5b9cd51 --- /dev/null +++ b/tests/test_connection/test_state.py @@ -0,0 +1,36 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import unittest + + +from coolamqp.connection.state import Broker +from coolamqp.connection import NodeDefinition +from coolamqp.uplink import ListenerThread, Connection, Handshaker +import socket +import time + + +def newc(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('127.0.0.1', 5672)) + s.settimeout(0) + s.send('AMQP\x00\x00\x09\x01') + return s + + +NODE = NodeDefinition('127.0.0.1', 5672, 'user', 'user') + + +class TestState(unittest.TestCase): + def test_basic(self): + lt = ListenerThread() + lt.start() + + broker = Broker(Connection(newc(), lt), NODE) + + ord = broker.connect() + ord.wait() + + time.sleep(100) + + lt.terminate() -- GitLab