diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py deleted file mode 100644 index bebc6deb77e8e27bee730334b054ca75329fc384..0000000000000000000000000000000000000000 --- a/coolamqp/backends/pyamqp.py +++ /dev/null @@ -1,169 +0,0 @@ -# coding=UTF-8 -"""Backend using pyamqp""" -from __future__ import division -import amqp -import socket -import six -import functools -import logging -from coolamqp.backends.base import AMQPBackend, RemoteAMQPError, ConnectionFailedError -import monotonic - - -logger = logging.getLogger(__name__) - - -def translate_exceptions(fun): - """ - Translates pyamqp's exceptions to CoolAMQP's - - py-amqp's exceptions are less than intuitive, so expect many special cases - """ - @functools.wraps(fun) - def q(*args, **kwargs): - try: - return fun(*args, **kwargs) - except (amqp.exceptions.ConsumerCancelled): - # I did not expect those here. Channel must be really bad. - raise ConnectionFailedError('WTF: '+(e.message if six.PY2 else e.args[0])) - except (amqp.exceptions.Blocked, ) as e: - pass # too bad - except (amqp.RecoverableChannelError, - amqp.exceptions.NotFound, - amqp.exceptions.AccessRefused) as e: - - - raise RemoteAMQPError(e.reply_code, e.reply_text) - except (IOError, - amqp.ConnectionForced, - amqp.IrrecoverableChannelError, - amqp.exceptions.UnexpectedFrame) as e: - raise ConnectionFailedError(e.message if six.PY2 else e.args[0]) - return q - - -class PyAMQPBackend(AMQPBackend): - @translate_exceptions - def __init__(self, node, cluster_handler_thread): - AMQPBackend.__init__(self, node, cluster_handler_thread) - - self.connection = amqp.Connection(host=node.host, - userid=node.user, - password=node.password, - virtual_host=node.virtual_host, - heartbeat=node.heartbeat or 0) - try: - self.connection.connect() #todo what does this raise? - except AttributeError: - pass # this does not always have to exist - self.channel = self.connection.channel() - self.channel.auto_decode = False - self.heartbeat = node.heartbeat or 0 - self.last_heartbeat_at = monotonic.monotonic() - - def shutdown(self): - AMQPBackend.shutdown(self) - print 'BACKEND SHUTDOWN START' - try: - self.channel.close() - except: - pass - try: - self.connection.close() - except: - pass - print 'BACKEND SHUTDOWN COMPLETE' - - @translate_exceptions - def process(self, max_time=1): - try: - if self.heartbeat > 0: - if monotonic.monotonic() - self.last_heartbeat_at > (self.heartbeat / 2): - self.connection.heartbeat_tick(rate=self.heartbeat) - self.last_heartbeat_at = monotonic.monotonic() - self.connection.drain_events(max_time) - except socket.timeout as e: - pass - - @translate_exceptions - def basic_cancel(self, consumer_tag): - self.channel.basic_cancel(consumer_tag) - - @translate_exceptions - def basic_publish(self, message, exchange, routing_key): - # convert this to pyamqp's Message - a = amqp.Message(six.binary_type(message.body), - **message.properties) - - self.channel.basic_publish(a, exchange=exchange.name, routing_key=routing_key) - - @translate_exceptions - def exchange_declare(self, exchange): - self.channel.exchange_declare(exchange.name, exchange.type, durable=exchange.durable, - auto_delete=exchange.auto_delete) - - @translate_exceptions - def queue_bind(self, queue, exchange, routing_key=''): - self.channel.queue_bind(queue.name, exchange.name, routing_key) - - @translate_exceptions - def basic_ack(self, delivery_tag): - self.channel.basic_ack(delivery_tag, multiple=False) - - @translate_exceptions - def exchange_delete(self, exchange): - self.channel.exchange_delete(exchange.name) - - @translate_exceptions - def basic_qos(self, prefetch_size, prefetch_count, global_): - self.channel.basic_qos(prefetch_size, prefetch_count, global_) - - @translate_exceptions - def queue_delete(self, queue): - self.channel.queue_delete(queue.name) - - @translate_exceptions - def basic_reject(self, delivery_tag): - self.channel.basic_reject(delivery_tag, True) - - @translate_exceptions - def queue_declare(self, queue): - """ - Declare a queue. - - This will change queue's name if anonymous - :param queue: Queue - """ - if queue.anonymous: - queue.name = '' - - qname, mc, cc = self.channel.queue_declare(queue.name, - durable=queue.durable, - exclusive=queue.exclusive, - auto_delete=queue.auto_delete) - if queue.anonymous: - queue.name = qname - - @translate_exceptions - def basic_consume(self, queue, no_ack=False): - """ - Start consuming from a queue - :param queue: Queue object - """ - self.channel.basic_consume(queue.name, - consumer_tag=queue.consumer_tag, - exclusive=queue.exclusive, - no_ack=no_ack, - callback=self.__on_message, - on_cancel=self.__on_consumercancelled) - - def __on_consumercancelled(self, consumer_tag): - self.cluster_handler_thread._on_consumercancelled(consumer_tag) - - def __on_message(self, message): - assert isinstance(message.body, six.binary_type) - self.cluster_handler_thread._on_recvmessage(message.body, - message.delivery_info['exchange'], - message.delivery_info['routing_key'], - message.delivery_info['delivery_tag'], - message.properties) diff --git a/coolamqp/connection/__init__.py b/coolamqp/connection/__init__.py index 9f2b35b38d89264ee25685611d0a65a192e165f6..569c71d552705427e8b7c577de85293ce3c7b462 100644 --- a/coolamqp/connection/__init__.py +++ b/coolamqp/connection/__init__.py @@ -1,2 +1,7 @@ # coding=UTF-8 +""" +This does things relating to a single connection +""" from __future__ import absolute_import, division, print_function + +from coolamqp.connection.definition import NodeDefinition diff --git a/coolamqp/connection/definition.py b/coolamqp/connection/definition.py index 82778d3e1fc77ba470b692b360325cbf1cb49f64..58682beab89ad6aa6aeb328e9105daf4772e1b03 100644 --- a/coolamqp/connection/definition.py +++ b/coolamqp/connection/definition.py @@ -8,8 +8,9 @@ class NodeDefinition(object): Definition of a node """ - def __init__(self, host, port, user, password, virtual_host='/', amqp_version='0.9.1', heartbeat=None): + def __init__(self, host, port, user, password, virtual_host='/', heartbeat=None): """ + All necessary information to establish a link to a broker. :param host: TCP host, str :param port: TCP port, int @@ -23,5 +24,10 @@ class NodeDefinition(object): self.host = host self.port = port self.virtual_host = virtual_host - self.amqp_version = amqp_version self.heartbeat = heartbeat + + def to_connection_object(self): + """ + Return a Connection object that + :return: + """ diff --git a/coolamqp/connection/orders.py b/coolamqp/connection/orders.py new file mode 100644 index 0000000000000000000000000000000000000000..9c43262c781ff4b4000466b67a5d61aa50890422 --- /dev/null +++ b/coolamqp/connection/orders.py @@ -0,0 +1,39 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +""" +You can feed Broker with some orders. +They work pretty much like futures. +""" +import threading + + +class BaseOrder(object): + def __init__(self, on_completed=None, on_failed=None): + self.lock = threading.Lock() + self.lock.acquire() + self.on_completed = on_completed + self.on_failed = on_failed + + def on_done(self): + if self.on_completed is not None: + self.on_completed() + self.lock.release() + + def on_fail(self): + if self.on_failed is not None: + self.on_failed() + self.lock.release() + + def wait(self): + self.lock.acquire() + + +class LinkSetup(BaseOrder): + """ + Connecting to broker + """ + +class ConsumeOnChannel(BaseOrder): + """ + + """ \ No newline at end of file diff --git a/coolamqp/connection/state.py b/coolamqp/connection/state.py new file mode 100644 index 0000000000000000000000000000000000000000..b1c75e43b6352cf53123b77be386d72ec498630b --- /dev/null +++ b/coolamqp/connection/state.py @@ -0,0 +1,78 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import six +from coolamqp.uplink import Handshaker +from coolamqp.connection.orders import LinkSetup +from coolamqp.framing.definitions import ChannelOpenOk, ChannelOpen +from coolamqp.framing.frames import AMQPMethodFrame + + +class Broker(object): + """ + A connection to a single broker + """ + + def __init__(self, connection, node_definition): + """ + :param connection: coolamqp.uplink.Connection, before .start is called + :param node_definition: node definition that will be used to handshake + """ + self.connection = connection + self.node_definition = node_definition + + self.free_channels = [] # list of channels usable for consuming shit + + + def connect(self): + """Return an LinkSetup order to get when it connects""" + ls = LinkSetup() + + def send_channel_opened(): + # OK, opened + ls.on_done() + + def handshake_complete(): + if handshaker.channel_max < 1: + self.connection.send(None, 'channel_max < 1 !!!') + ls.on_fail() + return + + for free_chan in six.moves.xrange(2, handshaker.channel_max+1): + self.free_channels.append(free_chan) + + # It's OK, open channel 1 for sending messages + self.connection.watch_for_method(1, ChannelOpenOk, send_channel_opened) + self.connection.send([AMQPMethodFrame(1, ChannelOpen())]) + + handshaker = Handshaker( + self.connection, + self.node_definition.user, + self.node_definition.password, + self.node_definition.virtual_host, + handshake_complete, + ls.on_failed, + heartbeat=self.node_definition.heartbeat or 0 + ) + + def send_channel_opened(frame): + # OK, opened + ls.on_done() + + def handshake_complete(): + if handshaker.channel_max < 1: + self.connection.send(None, 'channel_max < 1 !!!') + ls.on_fail() + return + + for free_chan in six.moves.xrange(2, handshaker.channel_max+1): + self.free_channels.append(free_chan) + + # It's OK, open channel 1 for sending messages + self.connection.watch_for_method(1, ChannelOpenOk, send_channel_opened) + self.connection.send([AMQPMethodFrame(1, ChannelOpen())]) + + self.connection.start() + + return ls + + diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index 08d7aaed17f59c0429f08f68437f7661e1b3deb0..0bc471b2a7215379b218decc12d7bdbbca9d5446 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -151,6 +151,9 @@ class AMQPMethodPayload(AMQPPayload): :return: int, size of argument section """ + if self.IS_CONTENT_STATIC: + return len(self.STATIC_CONTENT)-4-4-1 # minus length, class, method, frame_end + raise NotImplementedError() def write_arguments(self, buf): diff --git a/coolamqp/framing/compilation/compile_definitions.py b/coolamqp/framing/compilation/compile_definitions.py index 41025f9479f1841d0b26f311d4808ff1486594f4..ff86fb4f5b39eda7680a2df38c1bcf420bb864c9 100644 --- a/coolamqp/framing/compilation/compile_definitions.py +++ b/coolamqp/framing/compilation/compile_definitions.py @@ -344,7 +344,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved line(''' STATIC_CONTENT = %s # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END ''', - to_code_binary(struct.pack('!LBB', static_size + 4, cls.index, method.index) + \ + to_code_binary(struct.pack('!LHH', static_size + 4, cls.index, method.index) + \ method.get_static_body() + \ struct.pack('!B', FRAME_END))) diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index d45411e25763a68a0d3648604d0c85b7a467e123..d0b25b03147eb1bc1a8a07dcfa1c10605ddc9344 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -191,7 +191,7 @@ class ConnectionCloseOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x0A\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -279,7 +279,7 @@ class ConnectionOpenOk(AMQPMethodPayload): IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x0A\x29\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x29\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details FIELDS = [ @@ -788,7 +788,7 @@ class ChannelCloseOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x14\x29\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x14\x00\x29\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -915,7 +915,7 @@ class ChannelOpen(AMQPMethodPayload): IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x05\x14\x0A\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x05\x00\x14\x00\x0A\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details FIELDS = [ @@ -952,7 +952,7 @@ class ChannelOpenOk(AMQPMethodPayload): IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x05\x14\x0B\x00\x00\x00\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x05\x00\x14\x00\x0B\x00\x00\x00\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details FIELDS = [ @@ -1168,7 +1168,7 @@ class ExchangeDeclareOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x28\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -1197,7 +1197,7 @@ class ExchangeDeleteOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x28\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -1334,7 +1334,7 @@ class QueueBindOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x32\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x32\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -1826,7 +1826,7 @@ class QueueUnbindOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x32\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x32\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -2503,7 +2503,7 @@ class BasicGetEmpty(AMQPMethodPayload): IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x0D\x3C\x48\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x0D\x00\x3C\x00\x48\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details FIELDS = [ @@ -2700,7 +2700,7 @@ class BasicQosOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x3C\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x3C\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -2957,7 +2957,7 @@ class BasicRecoverOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x3C\x6F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x3C\x00\x6F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3004,7 +3004,7 @@ class TxCommit(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x14\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x14\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3034,7 +3034,7 @@ class TxCommitOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3066,7 +3066,7 @@ class TxRollback(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x1E\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x1E\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3096,7 +3096,7 @@ class TxRollbackOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x1F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x1F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3126,7 +3126,7 @@ class TxSelect(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x0A\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x0A\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ @@ -3156,7 +3156,7 @@ class TxSelectOk(AMQPMethodPayload): IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content - STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ diff --git a/coolamqp/scaffold.py b/coolamqp/scaffold.py deleted file mode 100644 index 7c3d245ce3bd458d561d5ac8d75c8e41df97daf2..0000000000000000000000000000000000000000 --- a/coolamqp/scaffold.py +++ /dev/null @@ -1,43 +0,0 @@ -# coding=UTF-8 -""" -Utilities for building the CoolAMQP engine -""" -from __future__ import absolute_import, division, print_function -import functools -import threading - - -class Synchronized(object): - """Protects access to methods with a lock""" - def __init__(self): - self.lock = threading.Lock() - - def protect(self): - """Make the function body lock-protected""" - def outer(fun): - @functools.wraps(fun) - def inner(*args, **kwargs): - with self.lock: - return fun(*args, **kwargs) - return inner - return outer - - -class AcceptsFrames(object): - """Base class for objects that accept AMQP frames""" - - def on_frame(self, amqp_frame): - """ - :type amqp_frame: AMQPFrame object - """ - - -class EmitsFrames(object): - """Base class for objects that send AMQP frames somewhere""" - - def wire_frame_to(self, acceptor): - """ - Configure this object to send frames somewhere. - - :param acceptor: an AcceptsFrames instance or callable(AMQPMethod instance) - """ \ No newline at end of file diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index f124517d83d00b09a9d5add7cedaf3b2fa6153bc..11baba98ac3df9ef7ad6510a124d7065d8ab476b 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -1,11 +1,17 @@ # coding=UTF-8 """ -The layer that allows you to attach Reactors, -ie. objects that are informed upon receiving a frame or connection dying. -They can also send frames themselves. + +Core object here is Connection. This package: + - establishes basic connectivity (up to the point where you can open channels yourself) + - takes care of heartbeats + +You can wait for a particular frame by setting watches on connections. +Watches will fire upon an event triggering them. + """ from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection import Connection from coolamqp.uplink.handshake import Handshaker from coolamqp.uplink.listener import ListenerThread +from coolamqp.uplink.connection import FailWatch, Watch diff --git a/coolamqp/uplink/connection/__init__.py b/coolamqp/uplink/connection/__init__.py index beed0f95a3f6ee57be9b0df84979d88e9f8b8b38..148c579b0a94f5d48565b02326505e1f7f4dedb1 100644 --- a/coolamqp/uplink/connection/__init__.py +++ b/coolamqp/uplink/connection/__init__.py @@ -9,3 +9,4 @@ Connection is something that can: from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection.connection import Connection +from coolamqp.uplink.connection.watches import FailWatch, Watch diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 20c2aa24371449fc12b6fa90f2f75d4258f21591..fa375145e0f849375e26d8ffcb27611fd90129bb 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -8,6 +8,7 @@ from coolamqp.uplink.connection.recv_framer import ReceivingFramer from coolamqp.uplink.connection.send_framer import SendingFramer from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeartbeatFrame +from coolamqp.uplink.connection.watches import MethodWatch, FailWatch logger = logging.getLogger(__name__) @@ -26,11 +27,8 @@ class Connection(object): self.failed = False self.transcript = None - self.method_watches = {} # channel => [AMQPMethodPayload instance, callback] - - # to call if an unwatched frame is caught - self.on_heartbeat = lambda: None - self.unwatched_frame = lambda frame: None # callable(AMQPFrame instance) + self.watches = {} # channel => [Watch object] + self.fail_watches = [] def start(self): """ @@ -43,8 +41,21 @@ class Connection(object): self.sendf = SendingFramer(self.listener_socket.send) def on_fail(self): + """Underlying connection is closed""" if self.transcript is not None: self.transcript.on_fail() + + for channel, watches in self.watches: + for watch in watches: + watch.failed() + + self.watches = {} + + for watch in self.fail_watches: + watch.fire() + + self.fail_watches = [] + self.failed = True def send(self, frames, reason=None): @@ -69,18 +80,19 @@ class Connection(object): if self.transcript is not None: self.transcript.on_frame(frame) - if isinstance(frame, AMQPMethodFrame): - if frame.channel in self.method_watches: - if isinstance(frame.payload, self.method_watches[frame.channel][0]): - method, callback = self.method_watches[frame.channel].popleft() - callback(frame.payload) - return + if frame.channel in self.watches: + deq = self.watches[frame.channel] - if isinstance(frame, AMQPHeartbeatFrame): - self.on_heartbeat() - return + examined_watches = [] + while len(deq) > 0: + watch = deq.popleft() + if not watch.is_triggered_by(frame) or (not watch.oneshot): + examined_watches.append(watch) - self.unwatched_frame(frame) + for watch in reversed(examined_watches): + deq.appendleft(watch) + + logger.critical('Unhandled frame %s, dropping', frame) def watch_watchdog(self, delay, callback): """ @@ -88,12 +100,23 @@ class Connection(object): """ self.listener_socket.oneshot(delay, callback) + def watch(self, watch): + """ + Register a watch. + :param watch: Watch to register + """ + if isinstance(watch, FailWatch): + self.fail_watches.append(watch) + else: + if watch.channel not in self.watches: + self.watches[watch.channel] = collections.deque([watch]) + else: + self.watches[watch.channel].append(watch) + def watch_for_method(self, channel, method, callback): """ :param channel: channel to monitor :param method: AMQPMethodPayload class :param callback: callable(AMQPMethodPayload instance) """ - if channel not in self.method_watches: - self.method_watches[channel] = collections.deque() - self.method_watches[channel].append((method, callback)) \ No newline at end of file + self.watch(MethodWatch(channel, method, callback)) diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py new file mode 100644 index 0000000000000000000000000000000000000000..eaf81e9d1afbbb0c6d481c8b2196b54205c8f8d7 --- /dev/null +++ b/coolamqp/uplink/connection/watches.py @@ -0,0 +1,96 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + +from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeartbeatFrame +from coolamqp.framing.base import AMQPMethodPayload + + +class Watch(object): + """ + A watch is placed per-channel, to listen for a particular frame. + """ + + def __init__(self, channel, oneshot): + """ + :param channel: Channel to listen to + :param oneshot: Is destroyed after triggering? + """ + self.channel = channel + self.oneshot = oneshot + + def is_triggered_by(self, frame): + """ + Does frame trigger this watch? + Run callable if it does. + :param frame: AMQPFrame instance + :return: bool + """ + raise Exception('Abstract method') + + def failed(self): + """ + This watch will process things no more, because underlying + link has failed + """ + +class FailWatch(Watch): + """ + A special kind of watch that fires when connection has died + """ + def __init__(self, callable): + Watch.__init__(self, None, True) + self.callable = callable + + def fire(self): + """ + Connection failed! + """ + self.callable() + + +class HeartbeatWatch(Watch): + """ + Registered if heartbeats are enabled + """ + def __init__(self, callable): + Watch.__init__(self, 0, False) + self.callable = callable + + def is_triggered_by(self, frame): + if isinstance(frame, AMQPHeartbeatFrame): + self.callable() + return True + return False + + +class MethodWatch(Watch): + """ + One-shot watch listening for methods. + """ + def __init__(self, channel, method_or_methods, callable, on_end=None): + """ + :param method_or_methods: class, or list of AMQPMethodPayload classes + :param callable: callable(AMQPMethodPayload instance) + :param on_end: callable/0 on link dying + """ + Watch.__init__(self, channel, True) + self.callable = callable + if issubclass(method_or_methods, AMQPMethodPayload): + self.methods = (method_or_methods, ) + else: + self.methods = tuple(method_or_methods) + self.on_end = on_end + + def failed(self): + if self.on_end is not None: + self.on_end() + + def is_triggered_by(self, frame): + + if not isinstance(frame, AMQPMethodFrame): + return False + + if isinstance(frame.payload, self.methods): + self.callable(frame.payload) + return True + return False diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index ce097cff8e3fad8c92c82f085f2005ce6f9130f1..48e50c191c4349ec32d03bba6138c2be72662bb6 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -26,7 +26,6 @@ class Handshaker(object): Object that given a connection rolls the handshake. """ - def __init__(self, connection, login, password, virtual_host, on_success, on_fail, heartbeat=0): """ @@ -83,7 +82,7 @@ class Handshaker(object): def on_connection_tune(self, payload): print('Channel max: ', payload.channel_max, 'Frame max: ', payload.frame_max, 'Heartbeat: ', payload.heartbeat) - self.channel_max = payload.channel_max + self.channel_max = 65535 if payload.channel_max == 0 else payload.channel_max self.frame_max = payload.frame_max self.heartbeat = min(payload.heartbeat, self.heartbeat) diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py index ddf7f43265c06b15057366f335300fa6baa01859..1804c52b46aaf46602ea779ece573f3212070507 100644 --- a/coolamqp/uplink/heartbeat.py +++ b/coolamqp/uplink/heartbeat.py @@ -3,7 +3,7 @@ from __future__ import absolute_import, division, print_function import monotonic from coolamqp.framing.frames import AMQPHeartbeatFrame - +from coolamqp.uplink.connection.watches import HeartbeatWatch class Heartbeater(object): """ @@ -17,7 +17,7 @@ class Heartbeater(object): self.last_heartbeat_on = monotonic.monotonic() # last heartbeat from server self.connection.watch_watchdog(self.heartbeat_interval, self.on_timer) - self.connection.on_heartbeat = self.on_heartbeat + self.connection.watch(HeartbeatWatch(self.on_heartbeat)) def on_heartbeat(self): self.last_heartbeat_on = monotonic.monotonic() diff --git a/tests/run.py b/tests/run.py index 74933822cdf47aec324f0432086cfd100f7ef67f..262ea2fbdb12ef42b0aea732ca5aa2ace0e6cd24 100644 --- a/tests/run.py +++ b/tests/run.py @@ -3,6 +3,8 @@ from __future__ import absolute_import, division, print_function from coolamqp.uplink import ListenerThread, Connection import socket import time +from coolamqp.connection.state import Broker +from coolamqp.connection import NodeDefinition from coolamqp.uplink.transcript import SessionTranscript @@ -16,6 +18,9 @@ def newc(): from coolamqp.uplink import Handshaker +NODE = NodeDefinition('127.0.0.1', 5672, 'user', 'user', heartbeat=5) + + if __name__ == '__main__': lt = ListenerThread() @@ -23,9 +28,9 @@ if __name__ == '__main__': con = Connection(newc(), lt) con.transcript = SessionTranscript() + broker = Broker(con, NODE) - handshaker = Handshaker(con, 'user', 'user', '/', lambda: None, lambda: None, heartbeat=10) - con.start() + broker.connect() time.sleep(50) diff --git a/coolamqp/cluster/__init__.py b/tests/test_connection/__init__.py similarity index 100% rename from coolamqp/cluster/__init__.py rename to tests/test_connection/__init__.py diff --git a/tests/test_connection/test_state.py b/tests/test_connection/test_state.py new file mode 100644 index 0000000000000000000000000000000000000000..5b9cd514256234ac64d922a9ac450cae0bea38b8 --- /dev/null +++ b/tests/test_connection/test_state.py @@ -0,0 +1,36 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import unittest + + +from coolamqp.connection.state import Broker +from coolamqp.connection import NodeDefinition +from coolamqp.uplink import ListenerThread, Connection, Handshaker +import socket +import time + + +def newc(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('127.0.0.1', 5672)) + s.settimeout(0) + s.send('AMQP\x00\x00\x09\x01') + return s + + +NODE = NodeDefinition('127.0.0.1', 5672, 'user', 'user') + + +class TestState(unittest.TestCase): + def test_basic(self): + lt = ListenerThread() + lt.start() + + broker = Broker(Connection(newc(), lt), NODE) + + ord = broker.connect() + ord.wait() + + time.sleep(100) + + lt.terminate()