From 3612eb4bc5ab4e3f77c4130b684488f8ae84cc9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Tue, 3 Jan 2017 03:38:13 +0100 Subject: [PATCH] next refactor --- coolamqp/connection/connecting.py | 71 ------- coolamqp/connection/state.py | 99 ---------- coolamqp/uplink/__init__.py | 2 - coolamqp/uplink/connection/__init__.py | 3 + coolamqp/uplink/connection/connection.py | 182 ++++++++++++------ .../connection/states.py} | 4 + coolamqp/uplink/connection/watches.py | 16 +- coolamqp/uplink/handshake.py | 91 ++++----- coolamqp/uplink/heartbeat.py | 8 +- coolamqp/uplink/listener/__init__.py | 2 + coolamqp/uplink/transcript.py | 32 --- tests/run.py | 10 +- 12 files changed, 204 insertions(+), 316 deletions(-) delete mode 100644 coolamqp/connection/connecting.py delete mode 100644 coolamqp/connection/state.py rename coolamqp/{connection/publishing.py => uplink/connection/states.py} (62%) delete mode 100644 coolamqp/uplink/transcript.py diff --git a/coolamqp/connection/connecting.py b/coolamqp/connection/connecting.py deleted file mode 100644 index 6d1a0e9..0000000 --- a/coolamqp/connection/connecting.py +++ /dev/null @@ -1,71 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import logging -import six -from coolamqp.state.orders import BaseOrder -from coolamqp.framing.definitions import ChannelOpenOk, ChannelOpen -from coolamqp.framing.frames import AMQPMethodFrame -from coolamqp.uplink import Handshaker -from coolamqp.framing.extensions import PUBLISHER_CONFIRMS -""" -All the routines required to go from connecting to synced -""" - -logger = logging.getLogger(__name__) - - -class ConnectingPeople(BaseOrder): - def configure(self, broker, handshaker, connection, nodedef): - self.handshaker = handshaker - self.broker = broker - self.connection = connection - self.node_definition = nodedef - - def on_fail(self, reason): - """Called at any time, by anything going wrong with basic init and sync""" - self.set_exception(Exception(reason or 'Initialization failed')) - - def handshake_complete(self): - """Called by handshaker, upon completing the initial frame exchange""" - logger.info('%s:%s entered RANGING', self.node_definition.host, self.node_definition.port) - self.broker.extensions = set(self.handshaker.extensions) - if self.handshaker.channel_max < 2: - self.connection.send(None, 'channel_max < 2 !!!') - self.on_fail('channel_max < 2 !!') - return - - for free_chan in six.moves.xrange(3, self.handshaker.channel_max + 1): - self.broker.free_channels.append(free_chan) - - # It's OK, open channel 1 for sending messages - self.connection.watch_for_method(1, ChannelOpenOk, self.send_nonconfirm_channel_opened) - self.connection.send([AMQPMethodFrame(1, ChannelOpen())]) - - def send_nonconfirm_channel_opened(self, payload): - """Called upon opening channel #1, which is used for publishing messages without confirmation""" - - if PUBLISHER_CONFIRMS in self.handshaker.extensions: - # We need to set up channel 2 with publisher confirms - self.connection.watch_for_method(2, ChannelOpenOk, self.on_channel_2) - self.connection.send([AMQPMethodFrame(2, ChannelOpen())]) - else: - # Else we don't set up this channel - self.on_syncing() - - def on_channel_2(self, payload): - """Do things with Channel 2 - only if PUBLISHER_CONFIRMATION extension is enabled!""" - from coolamqp.framing.definitions import ConfirmSelect, ConfirmSelectOk - if isinstance(payload, ChannelOpenOk): - # Ok, just opened the channel - self.connection.watch_for_method(2, ConfirmSelectOk, self.on_channel_2) - self.connection.send([AMQPMethodFrame(2, ConfirmSelect(False))]) - elif isinstance(payload, ConfirmSelectOk): - # A-OK! - logger.info('%s:%s entered SYNCING', self.node_definition.host, self.node_definition.port) - - def on_syncing(self): - """We are entering SYNCING""" - logger.info('%s:%s entered SYNCING', self.node_definition.host, self.node_definition.port) - self.connection.on_connected() - - diff --git a/coolamqp/connection/state.py b/coolamqp/connection/state.py deleted file mode 100644 index 5836e34..0000000 --- a/coolamqp/connection/state.py +++ /dev/null @@ -1,99 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import six -from coolamqp.uplink import Handshaker -from coolamqp.connection.orders import LinkSetup -from coolamqp.framing.definitions import ChannelOpenOk, ChannelOpen -from coolamqp.framing.frames import AMQPMethodFrame - - -class Broker(object): - """ - A connection to a single broker - """ - - def __init__(self, connection, node_definition): - """ - :param connection: coolamqp.uplink.Connection, before .start is called - :param node_definition: node definition that will be used to handshake - """ - self.connection = connection - self.node_definition = node_definition - - self.free_channels = [] # list of channels usable for consuming shit - - @staticmethod - def from_node_def(node_def, listener_thread, debug=True): - """ - :param node_def: NodeDefinition to use - :param listener_thread: ListenerThread to use - :return: a Broker with Connection. - """ - import socket - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect((node_def.host, node_def.port)) - s.settimeout(0) - s.send('AMQP\x00\x00\x09\x01') - - from coolamqp.uplink import Connection - con = Connection(s, listener_thread) - - if debug: - from coolamqp.uplink.transcript import SessionTranscript - con.transcript = SessionTranscript() - - return Broker(con, node_def) - - def connect(self): - """Return an LinkSetup order to get when it connects""" - ls = LinkSetup() - - def send_channel_opened(): - # OK, opened - ls.on_done() - - def handshake_complete(): - if handshaker.channel_max < 1: - self.connection.send(None, 'channel_max < 1 !!!') - ls.on_fail() - return - - for free_chan in six.moves.xrange(2, handshaker.channel_max+1): - self.free_channels.append(free_chan) - - # It's OK, open channel 1 for sending messages - self.connection.watch_for_method(1, ChannelOpenOk, send_channel_opened) - self.connection.send([AMQPMethodFrame(1, ChannelOpen())]) - - handshaker = Handshaker( - self.connection, - self.node_definition.user, - self.node_definition.password, - self.node_definition.virtual_host, - handshake_complete, - ls.on_failed, - heartbeat=self.node_definition.heartbeat or 0 - ) - - def send_channel_opened(frame): - # OK, opened - ls.on_done() - - def handshake_complete(): - if handshaker.channel_max < 1: - self.connection.send(None, 'channel_max < 1 !!!') - ls.on_fail() - return - - for free_chan in six.moves.xrange(2, handshaker.channel_max+1): - self.free_channels.append(free_chan) - - # It's OK, open channel 1 for sending messages - self.connection.watch_for_method(1, ChannelOpenOk, send_channel_opened) - self.connection.send([AMQPMethodFrame(1, ChannelOpen())]) - - self.connection.start() - - return ls - - diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index 11baba9..fb9906e 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -12,6 +12,4 @@ Watches will fire upon an event triggering them. from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection import Connection -from coolamqp.uplink.handshake import Handshaker from coolamqp.uplink.listener import ListenerThread -from coolamqp.uplink.connection import FailWatch, Watch diff --git a/coolamqp/uplink/connection/__init__.py b/coolamqp/uplink/connection/__init__.py index 148c579..f5fd1b4 100644 --- a/coolamqp/uplink/connection/__init__.py +++ b/coolamqp/uplink/connection/__init__.py @@ -5,6 +5,9 @@ Comprehensive management of a framing connection. Connection is something that can: - call something when an AMQPFrame is received - send AMQPFrame's + + Pretty much CoolAMQP is about persistent "attaches" that attach to transient connection + (they die when down) to do stuff, ie. send messages, consume, etc. """ from __future__ import absolute_import, division, print_function diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index fa37514..3130272 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -2,13 +2,17 @@ from __future__ import absolute_import, division, print_function import logging import collections -from coolamqp.uplink.listener import ListenerThread +import socket +import six from coolamqp.uplink.connection.recv_framer import ReceivingFramer from coolamqp.uplink.connection.send_framer import SendingFramer -from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeartbeatFrame +from coolamqp.framing.frames import AMQPMethodFrame +from coolamqp.uplink.handshake import Handshaker +from coolamqp.framing.definitions import ConnectionClose, ConnectionCloseOk +from coolamqp.uplink.connection.watches import MethodWatch +from coolamqp.uplink.connection.states import ST_ONLINE, ST_OFFLINE, ST_CONNECTING -from coolamqp.uplink.connection.watches import MethodWatch, FailWatch logger = logging.getLogger(__name__) @@ -18,85 +22,156 @@ class Connection(object): An object that manages a connection in a comprehensive way. It allows for sending and registering watches for particular things. + + WARNING: Thread-safety of watch operation hinges on atomicity + of .append and .pop. + + Lifecycle of connection is such: + + Connection created -> state is ST_CONNECTING + .start() called -> state is ST_CONNECTING + connection.open-ok -> state is ST_ONLINE """ - def __init__(self, socketobject, listener_thread): + def __init__(self, node_definition, listener_thread): + """ + Create an object that links to an AMQP broker. + + No data will be physically sent until you hit .start() + + :param node_definition: NodeDefinition instance to use + :param listener_thread: ListenerThread to use as async engine + """ self.listener_thread = listener_thread - self.socketobject = socketobject + self.node_definition = node_definition + self.recvf = ReceivingFramer(self.on_frame) - self.failed = False - self.transcript = None - self.watches = {} # channel => [Watch object] - self.fail_watches = [] + self.watches = {} # channel => list of [Watch instance] + + self.state = ST_CONNECTING + + # Negotiated connection parameters - handshake will fill this in + self.free_channels = [] # attaches can use this for shit. + # WARNING: thread safety of this hinges on atomicity of .pop or .append + self.frame_max = None + self.heartbeat = None + self.extensions = [] + + def on_connected(self): + """Called by handshaker upon reception of final connection.open-ok""" + self.state = ST_ONLINE def start(self): """ - Start processing events for this connect - :return: + Start processing events for this connect. Create the socket, + transmit 'AMQP\x00\x00\x09\x01' and roll. """ - self.listener_socket = self.listener_thread.register(self.socketobject, + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.node_definition.host, self.node_definition.port)) + sock.settimeout(0) + sock.send('AMQP\x00\x00\x09\x01') + + Handshaker(self, self.node_definition, self.on_connected) + self.listener_socket = self.listener_thread.register(sock, on_read=self.recvf.put, on_fail=self.on_fail) self.sendf = SendingFramer(self.listener_socket.send) + self.watch_for_method(0, (ConnectionClose, ConnectionCloseOk), self.on_connection_close) def on_fail(self): - """Underlying connection is closed""" - if self.transcript is not None: - self.transcript.on_fail() + """ + Called by event loop when the underlying connection is closed. + + This means the connection is dead, cannot be used anymore, and all operations + running on it now are aborted, null and void. + + This calls fails all registered watches. + Called by ListenerThread. - for channel, watches in self.watches: + WARNING: Note that .on_fail can get called twice - once from .on_connection_close, + and second time from ListenerThread when socket is disposed of + """ + self.state = ST_OFFLINE # Update state + + for channel, watches in six.iteritems(self.watches): # Run all watches - failed for watch in watches: watch.failed() - self.watches = {} + self.watches = {} # Clear the watch list - for watch in self.fail_watches: - watch.fire() + def on_connection_close(self, payload): + """ + Server attempted to close the connection.. or maybe we did? - self.fail_watches = [] + Called by ListenerThread. + """ + self.on_fail() # it does not make sense to prolong the agony - self.failed = True + if isinstance(payload, ConnectionClose): + self.send([AMQPMethodFrame(0, ConnectionCloseOk())]) + elif isinstance(payload, ConnectionCloseOk): + self.send(None) - def send(self, frames, reason=None): + def send(self, frames): """ :param frames: list of frames or None to close the link :param reason: optional human-readable reason for this action """ - if not self.failed: - if frames is not None: - self.sendf.send(frames) - if self.transcript is not None: - for frame in frames: - self.transcript.on_send(frame, reason) - else: - self.listener_socket.send(None) - self.failed = True - - if self.transcript is not None: - self.transcript.on_close_client(reason) + if frames is not None: + self.sendf.send(frames) + else: + # Listener socket will kill us when time is right + self.listener_socket.send(None) def on_frame(self, frame): - if self.transcript is not None: - self.transcript.on_frame(frame) + """ + Called by event loop upon receiving an AMQP frame. + + This will verify all watches on given channel if they were hit, + and take appropriate action. + Unhandled frames will be logged - if they were sent, they probably were important. + + :param frame: AMQPFrame that was received + """ + if isinstance(frame, AMQPMethodFrame): # temporary, for debugging + print('RECEIVED', frame.payload.NAME) + else: + print('RECEIVED ', frame) + + watch_handled = False # True if ANY watch handled this if frame.channel in self.watches: - deq = self.watches[frame.channel] + watches = self.watches[frame.channel] # a list - examined_watches = [] - while len(deq) > 0: - watch = deq.popleft() - if not watch.is_triggered_by(frame) or (not watch.oneshot): - examined_watches.append(watch) + alive_watches = [] + while len(watches) > 0: + watch = watches.pop() - for watch in reversed(examined_watches): - deq.appendleft(watch) + if watch.cancelled: + continue - logger.critical('Unhandled frame %s, dropping', frame) + watch_triggered = watch.is_triggered_by(frame) + watch_handled |= watch_triggered - def watch_watchdog(self, delay, callback): + if (not watch_triggered) or (not watch.oneshot): + # Watch remains alive if it was NOT triggered, or it's NOT a oneshot + alive_watches.append(watch) + + for watch in alive_watches: + watches.append(watch) + + if not watch_handled: + logger.critical('Unhandled frame %s', frame) + + def watchdog(self, delay, callback): """ Call callback in delay seconds. One-shot. + + Shall the connection die in the meantime, watchdog will not + be called, and everything will process according to + ListenerThread's on_fail callback. """ self.listener_socket.oneshot(delay, callback) @@ -105,18 +180,17 @@ class Connection(object): Register a watch. :param watch: Watch to register """ - if isinstance(watch, FailWatch): - self.fail_watches.append(watch) + if watch.channel not in self.watches: + self.watches[watch.channel] = collections.deque([watch]) else: - if watch.channel not in self.watches: - self.watches[watch.channel] = collections.deque([watch]) - else: - self.watches[watch.channel].append(watch) + self.watches[watch.channel].append(watch) def watch_for_method(self, channel, method, callback): """ :param channel: channel to monitor - :param method: AMQPMethodPayload class + :param method: AMQPMethodPayload class or tuple of AMQPMethodPayload classes :param callback: callable(AMQPMethodPayload instance) """ - self.watch(MethodWatch(channel, method, callback)) + mw = MethodWatch(channel, method, callback) + self.watch(mw) + return mw diff --git a/coolamqp/connection/publishing.py b/coolamqp/uplink/connection/states.py similarity index 62% rename from coolamqp/connection/publishing.py rename to coolamqp/uplink/connection/states.py index 9f2b35b..9f340b2 100644 --- a/coolamqp/connection/publishing.py +++ b/coolamqp/uplink/connection/states.py @@ -1,2 +1,6 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function + +ST_OFFLINE = 0 +ST_CONNECTING = 1 +ST_ONLINE = 2 \ No newline at end of file diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index eaf81e9..536147e 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -2,7 +2,6 @@ from __future__ import absolute_import, division, print_function from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeartbeatFrame -from coolamqp.framing.base import AMQPMethodPayload class Watch(object): @@ -17,6 +16,7 @@ class Watch(object): """ self.channel = channel self.oneshot = oneshot + self.cancelled = False def is_triggered_by(self, frame): """ @@ -33,6 +33,14 @@ class Watch(object): link has failed """ + def cancel(self): + """ + Called by watch's user. This watch will not receive events anymore + (whether about frame or fail), and it will be discarded upon next iteration. + """ + self.cancelled = True + + class FailWatch(Watch): """ A special kind of watch that fires when connection has died @@ -75,10 +83,10 @@ class MethodWatch(Watch): """ Watch.__init__(self, channel, True) self.callable = callable - if issubclass(method_or_methods, AMQPMethodPayload): - self.methods = (method_or_methods, ) - else: + if isinstance(method_or_methods, (list, tuple)): self.methods = tuple(method_or_methods) + else: + self.methods = method_or_methods self.on_end = on_end def failed(self): diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 3cba0c0..4b5b25a 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -3,13 +3,15 @@ from __future__ import absolute_import, division, print_function """ Provides reactors that can authenticate an AQMP session """ - +import six from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ - ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk + ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk, ConnectionClose from coolamqp.framing.frames import AMQPMethodFrame +from coolamqp.uplink.connection.states import ST_ONLINE -ST_AWAITING_CONNECTIONSTART = 0 -ST_CONNECTIONSTARTOK_SENT = 1 +SUPPORTED_EXTENSIONS = [ + b'publisher_confirms' +] CLIENT_DATA = [ # because RabbitMQ is some kind of a fascist and does not allow @@ -18,53 +20,48 @@ CLIENT_DATA = [ (b'version', (b'develop', b'S')), (b'copyright', (b'Copyright (C) 2016-2017 DMS Serwis', b'S')), (b'information', (b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', b'S')), - (b'capabilities', ([ - (b'consumer_cancel_notify', (True, b't')), - (b'connection.blocked', (True, b't')) - ], b'F')) + (b'capabilities', ([(capa, (True, b't')) for capa in SUPPORTED_EXTENSIONS], b'F')), ] +WATCHDOG_TIMEOUT = 10 + class Handshaker(object): """ Object that given a connection rolls the handshake. """ - def __init__(self, connection, login, password, virtual_host, - on_success, on_fail, heartbeat=0): + def __init__(self, connection, node_definition, + on_success): """ :param connection: Connection instance to use - :param login: login to try - :param password: password to try - :param virtual_host: virtual_host to pick + :type node_definition: NodeDefinition :param on_success: callable/0, on success - :param on_fail: callable/0, on failure - :param heartbeat: heartbeat to requisition """ self.connection = connection - self.login = login - self.password = password - self.virtual_host = virtual_host + self.login = node_definition.user.encode('utf8') + self.password = node_definition.password.encode('utf8') + self.virtual_host = node_definition.virtual_host.encode('utf8') + self.heartbeat = node_definition.heartbeat or 0 self.connection.watch_for_method(0, ConnectionStart, self.on_connection_start) # Callbacks self.on_success = on_success - self.on_fail = on_fail - - # Negotiated parameters - self.channel_max = None - self.frame_max = None - self.heartbeat = heartbeat - - self.connected = False + # Called by internal setup def on_watchdog(self): - if not self.connected: - # Not connected in 20 seconds - abort - self.connection.send(None, 'connection not established within 20 seconds') - self.on_fail() + """ + Called WATCHDOG_TIMEOUT seconds after setup begins + + If we are not ST_ONLINE after that much, something is wrong and pwn this connection. + """ + # Not connected in 20 seconds - abort + if self.connection.state != ST_ONLINE: + # closing the connection this way will get to Connection by channels of ListenerThread + self.connection.send(None) def on_connection_start(self, payload): + sasl_mechanisms = payload.mechanisms.split(b' ') locale_supported = payload.locales.split(b' ') @@ -72,36 +69,40 @@ class Handshaker(object): if b'PLAIN' not in sasl_mechanisms: raise ValueError('Server does not support PLAIN') - self.connection.watch_watchdog(20, self.on_watchdog) + # Select capabilities + server_props = dict(payload.server_properties) + if b'capabilities' in server_props: + for label, fv in server_props[b'capabilities'][0]: + if label in SUPPORTED_EXTENSIONS: + if fv[0]: + self.connection.extensions.append(label) + + self.connection.watchdog(WATCHDOG_TIMEOUT, self.on_watchdog) self.connection.watch_for_method(0, ConnectionTune, self.on_connection_tune) self.connection.send([ AMQPMethodFrame(0, ConnectionStartOk(CLIENT_DATA, b'PLAIN', - b'\x00' + self.login.encode('utf8') + b'\x00' + self.password.encode( - 'utf8'), + b'\x00' + self.login + b'\x00' + self.password, locale_supported[0] )) - ], 'connecting') + ]) def on_connection_tune(self, payload): - print('Channel max: ', payload.channel_max, 'Frame max: ', payload.frame_max, 'Heartbeat: ', payload.heartbeat) - - self.channel_max = 65535 if payload.channel_max == 0 else payload.channel_max - self.frame_max = payload.frame_max - self.heartbeat = min(payload.heartbeat, self.heartbeat) + self.connection.frame_max = payload.frame_max + self.connection.heartbeat = min(payload.heartbeat, self.heartbeat) + for channel in six.moves.xrange(1, (65535 if payload.channel_max == 0 else payload.channel_max)+1): + self.connection.free_channels.append(channel) self.connection.watch_for_method(0, ConnectionOpenOk, self.on_connection_open_ok) self.connection.send([ - AMQPMethodFrame(0, ConnectionTuneOk(self.channel_max, self.frame_max, self.heartbeat)), + AMQPMethodFrame(0, ConnectionTuneOk(payload.channel_max, payload.frame_max, self.connection.heartbeat)), AMQPMethodFrame(0, ConnectionOpen(self.virtual_host)) - ], 'connecting') + ]) # Install heartbeat handlers NOW, if necessary - if self.heartbeat > 0: + if self.connection.heartbeat > 0: from coolamqp.uplink.heartbeat import Heartbeater - Heartbeater(self.connection, self.heartbeat) + Heartbeater(self.connection, self.connection.heartbeat) def on_connection_open_ok(self, payload): - print('Connection opened OK!') self.on_success() - self.connected = True diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py index 1804c52..79cca87 100644 --- a/coolamqp/uplink/heartbeat.py +++ b/coolamqp/uplink/heartbeat.py @@ -16,7 +16,7 @@ class Heartbeater(object): self.last_heartbeat_on = monotonic.monotonic() # last heartbeat from server - self.connection.watch_watchdog(self.heartbeat_interval, self.on_timer) + self.connection.watchdog(self.heartbeat_interval, self.on_timer) self.connection.watch(HeartbeatWatch(self.on_heartbeat)) def on_heartbeat(self): @@ -24,11 +24,11 @@ class Heartbeater(object): def on_timer(self): """Timer says we should send a heartbeat""" - self.connection.send([AMQPHeartbeatFrame()], 'heartbeat') + self.connection.send([AMQPHeartbeatFrame()]) if (monotonic.monotonic() - self.last_heartbeat_on) > 2*self.heartbeat_interval: # closing because of heartbeat - self.connection.send(None, 'heartbeat expired') + self.connection.send(None) - self.connection.watch_watchdog(self.heartbeat_interval, self.on_timer) + self.connection.watchdog(self.heartbeat_interval, self.on_timer) diff --git a/coolamqp/uplink/listener/__init__.py b/coolamqp/uplink/listener/__init__.py index dff06e0..897550a 100644 --- a/coolamqp/uplink/listener/__init__.py +++ b/coolamqp/uplink/listener/__init__.py @@ -2,6 +2,8 @@ """ A listener is a thread that monitors a bunch of sockets for activity. +Think "asyncio core" but I couldn't be bothered to learn Twisted. + It provides both for sending and receiving messages. It is written as a package, because the optimal network call, epoll, is not available on Windows, and you might just want to use it. diff --git a/coolamqp/uplink/transcript.py b/coolamqp/uplink/transcript.py deleted file mode 100644 index 07044c1..0000000 --- a/coolamqp/uplink/transcript.py +++ /dev/null @@ -1,32 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function - -from coolamqp.framing.frames import AMQPMethodFrame - - -class SessionTranscript(object): - """ - For debugging you may wish to enable logging of the AMQP session - """ - - def on_close_client(self, reason=None): - """Uplink is being terminated client-side""" - print('Closed client side, reason is ', reason) - - def on_fail(self): - """Uplink closed server-side""" - print('Uplink terminated.') - - def on_frame(self, frame): - """Received a frame""" - if isinstance(frame, AMQPMethodFrame): - print('RECEIVED', frame.payload.NAME) - else: - print('RECEIVED ', frame) - - def on_send(self, frame, reason=None): - """Frames are being relayed""" - if isinstance(frame, AMQPMethodFrame): - print ('SENT', frame.payload.NAME, 'because', reason) - else: - print ('SENT', frame, 'because', reason) \ No newline at end of file diff --git a/tests/run.py b/tests/run.py index d74a933..7f49260 100644 --- a/tests/run.py +++ b/tests/run.py @@ -2,21 +2,21 @@ from __future__ import absolute_import, division, print_function from coolamqp.uplink import ListenerThread import time -from coolamqp.connection.state import Broker from coolamqp.connection import NodeDefinition - +from coolamqp.uplink import Connection +import logging NODE = NodeDefinition('127.0.0.1', 5672, 'user', 'user', heartbeat=5) - +logging.basicConfig(level=logging.INFO) if __name__ == '__main__': lt = ListenerThread() lt.start() - broker = Broker.from_node_def(NODE, lt) + con = Connection(NODE, lt) - broker.connect().wait() + con.start() time.sleep(50) -- GitLab