diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index 2c05f9316a4d504ba0ab1368d6c314417928cda8..d842d37297fd10ede76ff2f4674063bab171dfdd 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -110,8 +110,8 @@ class AMQPBodyFrame(AMQPFrame): class AMQPHeartbeatFrame(AMQPFrame): FRAME_TYPE = FRAME_HEARTBEAT - LENGTH = 6 - DATA = chr(FRAME_HEARTBEAT)+'\x00\x00\x00\x00\xCE' + LENGTH = 8 + DATA = struct.pack('!BHLB', FRAME_HEARTBEAT, 0, 0, FRAME_END) def __init__(self): AMQPFrame.__init__(self, 0) diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index 8405c1aa77024152f5029d49638fe9624a8ba310..f124517d83d00b09a9d5add7cedaf3b2fa6153bc 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -7,4 +7,5 @@ They can also send frames themselves. from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection import Connection +from coolamqp.uplink.handshake import Handshaker from coolamqp.uplink.listener import ListenerThread diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 7d8a88dd3aa588910e37c710c44d0d1b541f6a7b..20c2aa24371449fc12b6fa90f2f75d4258f21591 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -6,7 +6,7 @@ from coolamqp.uplink.listener import ListenerThread from coolamqp.uplink.connection.recv_framer import ReceivingFramer from coolamqp.uplink.connection.send_framer import SendingFramer -from coolamqp.framing.frames import AMQPMethodFrame +from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeartbeatFrame logger = logging.getLogger(__name__) @@ -24,9 +24,14 @@ class Connection(object): self.socketobject = socketobject self.recvf = ReceivingFramer(self.on_frame) self.failed = False + self.transcript = None self.method_watches = {} # channel => [AMQPMethodPayload instance, callback] + # to call if an unwatched frame is caught + self.on_heartbeat = lambda: None + self.unwatched_frame = lambda frame: None # callable(AMQPFrame instance) + def start(self): """ Start processing events for this connect @@ -38,26 +43,50 @@ class Connection(object): self.sendf = SendingFramer(self.listener_socket.send) def on_fail(self): + if self.transcript is not None: + self.transcript.on_fail() self.failed = True - def send(self, frames): + def send(self, frames, reason=None): """ :param frames: list of frames or None to close the link + :param reason: optional human-readable reason for this action """ if not self.failed: if frames is not None: self.sendf.send(frames) + if self.transcript is not None: + for frame in frames: + self.transcript.on_send(frame, reason) else: self.listener_socket.send(None) self.failed = True + if self.transcript is not None: + self.transcript.on_close_client(reason) + def on_frame(self, frame): + if self.transcript is not None: + self.transcript.on_frame(frame) + if isinstance(frame, AMQPMethodFrame): if frame.channel in self.method_watches: if isinstance(frame.payload, self.method_watches[frame.channel][0]): method, callback = self.method_watches[frame.channel].popleft() callback(frame.payload) + return + + if isinstance(frame, AMQPHeartbeatFrame): + self.on_heartbeat() + return + self.unwatched_frame(frame) + + def watch_watchdog(self, delay, callback): + """ + Call callback in delay seconds. One-shot. + """ + self.listener_socket.oneshot(delay, callback) def watch_for_method(self, channel, method, callback): """ diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index f1f9efd6b148164e50a8384522295ad57a1de3df..1075f52066130fc8c87982e2ef54a0d048546047 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -28,7 +28,7 @@ class ReceivingFramer(object): State machine (frame_type is None) and has_bytes(1) -> (frame_type <- bytes(1)) - (frame_type is HEARTBEAT) and has_bytes(3) -> (output_frame, frame_type <- None) + (frame_type is HEARTBEAT) and has_bytes(AMQPHeartbeatFrame.LENGTH-1) -> (output_frame, frame_type <- None) (frame_type is not HEARTBEAT and not None) and has_bytes(6) -> (frame_channel <- bytes(2), frame_size <- bytes(4)) @@ -82,12 +82,12 @@ class ReceivingFramer(object): return True # state rule 2 - elif (self.frame_type == FRAME_HEARTBEAT) and (self.total_data_len > 3): + elif (self.frame_type == FRAME_HEARTBEAT) and (self.total_data_len >= AMQPHeartbeatFrame.LENGTH-1): data = b'' - while len(data) < 3: - data = data + self._extract(3 - len(data)) + while len(data) < AMQPHeartbeatFrame.LENGTH-1: + data = data + six.binary_type(self._extract(AMQPHeartbeatFrame.LENGTH-1 - len(data))) - if data != AMQPHeartbeatFrame.DATA: + if data != AMQPHeartbeatFrame.DATA[1:]: # Invalid heartbeat frame! raise ValueError('Invalid AMQP heartbeat') diff --git a/coolamqp/uplink/connection/send_framer.py b/coolamqp/uplink/connection/send_framer.py index bc5c9c8a2c39dd047bf116da03f195decc70b458..14b76fdcc2436d8ac4499774014cddc8c213908d 100644 --- a/coolamqp/uplink/connection/send_framer.py +++ b/coolamqp/uplink/connection/send_framer.py @@ -43,8 +43,6 @@ class SendingFramer(object): for frame in frames: frame.write_to(buf) - print('Writing ', repr(frame), repr(frame.payload)) q = buf.getvalue() - print(repr(q)) self.on_send(q) diff --git a/coolamqp/handshake/__init__.py b/coolamqp/uplink/handshake.py similarity index 83% rename from coolamqp/handshake/__init__.py rename to coolamqp/uplink/handshake.py index 3c232c073f641ec57b3261890809dd8ff458bf35..ce097cff8e3fad8c92c82f085f2005ce6f9130f1 100644 --- a/coolamqp/handshake/__init__.py +++ b/coolamqp/uplink/handshake.py @@ -23,7 +23,7 @@ CLIENT_DATA = [ class Handshaker(object): """ - Object that given a connection rolls the handshake + Object that given a connection rolls the handshake. """ @@ -53,6 +53,14 @@ class Handshaker(object): self.frame_max = None self.heartbeat = heartbeat + self.connected = False + + def on_watchdog(self): + if not self.connected: + # Not connected in 20 seconds - abort + self.connection.send(None, 'connection not established within 20 seconds') + self.on_fail() + def on_connection_start(self, payload): sasl_mechanisms = payload.mechanisms.split(b' ') locale_supported = payload.locales.split(b' ') @@ -61,6 +69,7 @@ class Handshaker(object): if b'PLAIN' not in sasl_mechanisms: raise ValueError('Server does not support PLAIN') + self.connection.watch_watchdog(20, self.on_watchdog) self.connection.watch_for_method(0, ConnectionTune, self.on_connection_tune) self.connection.send([ AMQPMethodFrame(0, @@ -69,7 +78,7 @@ class Handshaker(object): 'utf8'), locale_supported[0] )) - ]) + ], 'connecting') def on_connection_tune(self, payload): print('Channel max: ', payload.channel_max, 'Frame max: ', payload.frame_max, 'Heartbeat: ', payload.heartbeat) @@ -82,8 +91,14 @@ class Handshaker(object): self.connection.send([ AMQPMethodFrame(0, ConnectionTuneOk(self.channel_max, self.frame_max, self.heartbeat)), AMQPMethodFrame(0, ConnectionOpen(self.virtual_host)) - ]) + ], 'connecting') + + # Install heartbeat handlers NOW, if necessary + if self.heartbeat > 0: + from coolamqp.uplink.heartbeat import Heartbeater + Heartbeater(self.connection, self.heartbeat) def on_connection_open_ok(self, payload): print('Connection opened OK!') self.on_success() + self.connected = True diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py new file mode 100644 index 0000000000000000000000000000000000000000..ddf7f43265c06b15057366f335300fa6baa01859 --- /dev/null +++ b/coolamqp/uplink/heartbeat.py @@ -0,0 +1,34 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import monotonic + +from coolamqp.framing.frames import AMQPHeartbeatFrame + + +class Heartbeater(object): + """ + An object that handles heartbeats + """ + + def __init__(self, connection, heartbeat_interval=0): + self.connection = connection + self.heartbeat_interval = heartbeat_interval + + self.last_heartbeat_on = monotonic.monotonic() # last heartbeat from server + + self.connection.watch_watchdog(self.heartbeat_interval, self.on_timer) + self.connection.on_heartbeat = self.on_heartbeat + + def on_heartbeat(self): + self.last_heartbeat_on = monotonic.monotonic() + + def on_timer(self): + """Timer says we should send a heartbeat""" + self.connection.send([AMQPHeartbeatFrame()], 'heartbeat') + + if (monotonic.monotonic() - self.last_heartbeat_on) > 2*self.heartbeat_interval: + # closing because of heartbeat + self.connection.send(None, 'heartbeat expired') + + self.connection.watch_watchdog(self.heartbeat_interval, self.on_timer) + diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 496aa0667708e0fd2d9236305dc266b976401d43..1c4e800d2b23c9a12aa91baa9363df0cf770d9a1 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -30,7 +30,6 @@ class EpollSocket(BaseSocket): self.data_to_send.append(data) self.listener.epoll.modify(self, self.get_epoll_eventset()) - def oneshot(self, seconds_after, callable): """ Set to fire a callable N seconds after @@ -119,6 +118,8 @@ class EpollListener(object): sock.fileno(), callback )) + else: + print('oneshot from nowhere') def register(self, sock, on_read=lambda data: None, on_fail=lambda: None): diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 5f5639d881365cc3ed9cc3ba5c4b5c3a3d93e72b..2f97b256f7b3d21569f9dfa4836f872726308f99 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -70,8 +70,6 @@ class BaseSocket(object): except (IOError, socket.error): raise SocketFailed() - print('Got ',repr(data)) - if len(data) == 0: raise SocketFailed() diff --git a/coolamqp/uplink/transcript.py b/coolamqp/uplink/transcript.py new file mode 100644 index 0000000000000000000000000000000000000000..07044c1c6d904874b0f8a42e79ac2113ff2bf69a --- /dev/null +++ b/coolamqp/uplink/transcript.py @@ -0,0 +1,32 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + +from coolamqp.framing.frames import AMQPMethodFrame + + +class SessionTranscript(object): + """ + For debugging you may wish to enable logging of the AMQP session + """ + + def on_close_client(self, reason=None): + """Uplink is being terminated client-side""" + print('Closed client side, reason is ', reason) + + def on_fail(self): + """Uplink closed server-side""" + print('Uplink terminated.') + + def on_frame(self, frame): + """Received a frame""" + if isinstance(frame, AMQPMethodFrame): + print('RECEIVED', frame.payload.NAME) + else: + print('RECEIVED ', frame) + + def on_send(self, frame, reason=None): + """Frames are being relayed""" + if isinstance(frame, AMQPMethodFrame): + print ('SENT', frame.payload.NAME, 'because', reason) + else: + print ('SENT', frame, 'because', reason) \ No newline at end of file diff --git a/tests/run.py b/tests/run.py index fb842333a3cbb04970f3590a4b9ef1911d688da7..74933822cdf47aec324f0432086cfd100f7ef67f 100644 --- a/tests/run.py +++ b/tests/run.py @@ -4,6 +4,8 @@ from coolamqp.uplink import ListenerThread, Connection import socket import time +from coolamqp.uplink.transcript import SessionTranscript + def newc(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -13,17 +15,18 @@ def newc(): return s -from coolamqp.handshake import Handshaker +from coolamqp.uplink import Handshaker if __name__ == '__main__': lt = ListenerThread() lt.start() con = Connection(newc(), lt) + con.transcript = SessionTranscript() - handshaker = Handshaker(con, 'user', 'user', '/') + handshaker = Handshaker(con, 'user', 'user', '/', lambda: None, lambda: None, heartbeat=10) con.start() - time.sleep(5) + time.sleep(50) lt.terminate() diff --git a/tests/test_uplink/test_basic.py b/tests/test_uplink/test_basic.py index 81c3a4cfe2cf441c5258bca5794acadaf105ff5b..034d7e9aad28a57f19e70a347a4e5254df5a12ef 100644 --- a/tests/test_uplink/test_basic.py +++ b/tests/test_uplink/test_basic.py @@ -2,8 +2,7 @@ from __future__ import absolute_import, division, print_function import unittest -from coolamqp.handshake import Handshaker -from coolamqp.uplink import ListenerThread, Connection +from coolamqp.uplink import ListenerThread, Connection, Handshaker import socket import time @@ -35,3 +34,24 @@ class TestBasic(unittest.TestCase): lt.terminate() self.assertTrue(hnd_ok['ok']) + + + def test_heartbeats(self): + + hnd_ok = {'ok': False} + def hnd_suc(): + hnd_ok['ok'] = True + + lt = ListenerThread() + lt.start() + + con = Connection(newc(), lt) + + Handshaker(con, 'user', 'user', '/', hnd_suc, lambda: None, 3) + con.start() + + time.sleep(20) + + self.assertFalse(con.failed) + lt.terminate() + self.assertTrue(hnd_ok['ok'])