diff --git a/LICENSE b/LICENSE index 9d8f6d330a52f91c22a43fb71b6ce3963c69613c..a08e0765bf9ed2266701bc5fbe3f1c2a767e0a0a 100644 --- a/LICENSE +++ b/LICENSE @@ -21,4 +21,4 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -resources/amqp-0-9-1.xml: Copyright (c) 2016 OASIS. All rights reserved. \ No newline at end of file +resources/*: Copyright (c) 2016 OASIS \ No newline at end of file diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index 5f7599891c14a1190ce760459d5b61e50101577b..08d7aaed17f59c0429f08f68437f7661e1b3deb0 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -135,7 +135,7 @@ class AMQPMethodPayload(AMQPPayload): Write own content to target buffer - starting from LENGTH, ending on FRAME_END :param buf: target buffer """ - from coolamqp.framing import FRAME_END + from coolamqp.framing.definitions import FRAME_END if self.IS_CONTENT_STATIC: buf.write(self.STATIC_CONTENT) diff --git a/coolamqp/framing/compilation/textcode_fields.py b/coolamqp/framing/compilation/textcode_fields.py index 3b52d7fd28e87dfc0268813a0f77666904d29663..fee06ffaf2848c544c546b497ab7448d5a8c24b9 100644 --- a/coolamqp/framing/compilation/textcode_fields.py +++ b/coolamqp/framing/compilation/textcode_fields.py @@ -56,7 +56,7 @@ def get_counter(fields, prefix='', indent_level=2): accumulator += 4 elif bt == 'table': parts.append('frame_table_size(' + nam + ')') - accumulator += 4 + accumulator += 0 # because frame_table_size accounts for that 4 leading bytes else: raise Exception() @@ -205,16 +205,20 @@ def get_serializer(fields, prefix='', indent_level=2): bits.append("False") else: bits.append(nam) - elif field.basic_type in ('shortstr', 'longstr'): - formats.append('B' if field.basic_type == 'shortstr' else 'I') - format_args.append('len('+nam+')') - emit_single_struct_pack() - emit('buf.write(%s)', nam) - elif field.basic_type == 'table': - emit('enframe_table(buf, %s)', nam) + elif field.reserved: + # Just pasta + emit('buf.write(%s)', BASIC_TYPES[field.basic_type][2]) else: - formats.append(BASIC_TYPES[field.basic_type][1]) - format_args.append(nam) + if field.basic_type in ('shortstr', 'longstr'): + formats.append('B' if field.basic_type == 'shortstr' else 'I') + format_args.append('len('+nam+')') + emit_single_struct_pack() + emit('buf.write(%s)', nam) + elif field.basic_type == 'table': + emit('enframe_table(buf, %s)', nam) + else: + formats.append(BASIC_TYPES[field.basic_type][1]) + format_args.append(nam) if len(bits) > 0: emit_bits() diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 509bc749549886cce0c5ed2777c6d008aa678c64..d45411e25763a68a0d3648604d0c85b7a467e123 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -244,8 +244,7 @@ class ConnectionOpen(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', len(self.virtual_host))) buf.write(self.virtual_host) - buf.write(struct.pack('!B', len(self.reserved_1))) - buf.write(self.reserved_1) + buf.write(b'\x00') buf.write(struct.pack('!B', 0)) def get_size(self): @@ -370,7 +369,7 @@ class ConnectionStart(AMQPMethodPayload): buf.write(self.locales) def get_size(self): - return 14 + frame_table_size(self.server_properties) + len(self.mechanisms) + len(self.locales) + return 10 + frame_table_size(self.server_properties) + len(self.mechanisms) + len(self.locales) @staticmethod def from_buffer(buf, start_offset): @@ -503,7 +502,7 @@ class ConnectionStartOk(AMQPMethodPayload): buf.write(self.locale) def get_size(self): - return 10 + frame_table_size(self.client_properties) + len(self.mechanism) + len(self.response) + len(self.locale) + return 6 + frame_table_size(self.client_properties) + len(self.mechanism) + len(self.response) + len(self.locale) @staticmethod def from_buffer(buf, start_offset): @@ -1056,7 +1055,8 @@ class ExchangeDeclare(AMQPMethodPayload): self.arguments = arguments def write_arguments(self, buf): - buf.write(struct.pack('!HB', self.reserved_1, len(self.exchange))) + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.exchange))) buf.write(self.exchange) buf.write(struct.pack('!B', len(self.type_))) buf.write(self.type_) @@ -1064,7 +1064,7 @@ class ExchangeDeclare(AMQPMethodPayload): buf.write(struct.pack('!B', (self.passive << 0) | (self.durable << 1) | (self.no_wait << 4))) def get_size(self): - return 9 + len(self.exchange) + len(self.type_) + frame_table_size(self.arguments) + return 5 + len(self.exchange) + len(self.type_) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): @@ -1130,7 +1130,8 @@ class ExchangeDelete(AMQPMethodPayload): self.no_wait = no_wait def write_arguments(self, buf): - buf.write(struct.pack('!HB', self.reserved_1, len(self.exchange))) + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.exchange))) buf.write(self.exchange) buf.write(struct.pack('!B', (self.if_unused << 0) | (self.no_wait << 1))) @@ -1282,7 +1283,8 @@ class QueueBind(AMQPMethodPayload): self.arguments = arguments def write_arguments(self, buf): - buf.write(struct.pack('!HB', self.reserved_1, len(self.queue))) + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!B', len(self.exchange))) buf.write(self.exchange) @@ -1292,7 +1294,7 @@ class QueueBind(AMQPMethodPayload): buf.write(struct.pack('!B', (self.no_wait << 0))) def get_size(self): - return 10 + len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size(self.arguments) + return 6 + len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): @@ -1425,13 +1427,14 @@ class QueueDeclare(AMQPMethodPayload): self.arguments = arguments def write_arguments(self, buf): - buf.write(struct.pack('!HB', self.reserved_1, len(self.queue))) + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) enframe_table(buf, self.arguments) buf.write(struct.pack('!B', (self.passive << 0) | (self.durable << 1) | (self.exclusive << 2) | (self.auto_delete << 3) | (self.no_wait << 4))) def get_size(self): - return 8 + len(self.queue) + frame_table_size(self.arguments) + return 4 + len(self.queue) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): @@ -1501,7 +1504,8 @@ class QueueDelete(AMQPMethodPayload): self.no_wait = no_wait def write_arguments(self, buf): - buf.write(struct.pack('!HB', self.reserved_1, len(self.queue))) + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!B', (self.if_unused << 0) | (self.if_empty << 1) | (self.no_wait << 2))) @@ -1664,7 +1668,8 @@ class QueuePurge(AMQPMethodPayload): self.no_wait = no_wait def write_arguments(self, buf): - buf.write(struct.pack('!HB', self.reserved_1, len(self.queue))) + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!B', (self.no_wait << 0))) @@ -1774,7 +1779,8 @@ class QueueUnbind(AMQPMethodPayload): self.arguments = arguments def write_arguments(self, buf): - buf.write(struct.pack('!HB', self.reserved_1, len(self.queue))) + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!B', len(self.exchange))) buf.write(self.exchange) @@ -1783,7 +1789,7 @@ class QueueUnbind(AMQPMethodPayload): enframe_table(buf, self.arguments) def get_size(self): - return 9 + len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size(self.arguments) + return 5 + len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): @@ -2084,7 +2090,8 @@ class BasicConsume(AMQPMethodPayload): self.arguments = arguments def write_arguments(self, buf): - buf.write(struct.pack('!HB', self.reserved_1, len(self.queue))) + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) @@ -2092,7 +2099,7 @@ class BasicConsume(AMQPMethodPayload): buf.write(struct.pack('!B', (self.no_local << 0) | (self.no_ack << 1) | (self.exclusive << 2) | (self.no_wait << 3))) def get_size(self): - return 9 + len(self.queue) + len(self.consumer_tag) + frame_table_size(self.arguments) + return 5 + len(self.queue) + len(self.consumer_tag) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): @@ -2383,7 +2390,8 @@ class BasicGet(AMQPMethodPayload): self.no_ack = no_ack def write_arguments(self, buf): - buf.write(struct.pack('!HB', self.reserved_1, len(self.queue))) + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!B', (self.no_ack << 0))) @@ -2574,7 +2582,8 @@ class BasicPublish(AMQPMethodPayload): self.immediate = immediate def write_arguments(self, buf): - buf.write(struct.pack('!HB', self.reserved_1, len(self.exchange))) + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.exchange))) buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index c5210cc566d2e1158eb575dcc2e4e740959a1f69..b53faa44c86b51039fb799b337a2694a2c29379c 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -30,23 +30,22 @@ def deframe_decimal(buf, offset): def deframe_shortstr(buf, offset): # -> value, bytes_eaten - ln = ord(buf[offset]) + ln, = struct.unpack_from('!B', buf, offset) return buf[offset+1:offset+1+ln], 1+ln def enframe_shortstr(buf, value): - buf.write(chr(len(value))) + buf.write(struct.pack('!B', len(value))) buf.write(value) def deframe_longstr(buf, offset): # -> value, bytes_eaten ln, = struct.unpack_from('!I', buf, offset) - offset += 4 - return buf[offset:offset+ln], 4 + ln + return buf[offset+4:offset+4+ln], 4 + ln def enframe_longstr(buf, value): - buf.write(struct.pack('!I', value)) + buf.write(struct.pack('!I', len(value))) buf.write(value) @@ -75,9 +74,9 @@ FIELD_TYPES = { def enframe_field_value(buf, fv): value, type = fv - buf.write(tp) + buf.write(type) - opt = FIELD_TYPES[tp] + opt = FIELD_TYPES[type] if opt[1] is not None: buf.write(struct.pack(opt[1], value)) @@ -154,9 +153,7 @@ def deframe_table(buf, start_offset): # -> (table, bytes_consumed) fields = [] while offset < (start_offset+table_length+4): - ln, = struct.unpack_from('!B', buf, offset) - offset += 1 - field_name = buf[offset:offset+ln] + field_name, ln = deframe_shortstr(buf, offset) offset += ln fv, delta = deframe_field_value(buf, offset) offset += delta @@ -184,12 +181,7 @@ def frame_array_size(array): def frame_table_size(table): """:return: length of table representation, in bytes, INCLUDING length header""" - size = 4 # length header - for k, fv in table: - v,t =fv - size += 1 + len(k) + frame_field_value_size(v, t) - - return size + return 4 + sum(1 + len(k) + frame_field_value_size(fv) for k, fv in table) FIELD_TYPES['A'] = (None, None, enframe_array, deframe_array, frame_array_size) diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index 2577d4e3ae2b515c9fd722937cb42b73b28402b5..2c05f9316a4d504ba0ab1368d6c314417928cda8 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -23,29 +23,30 @@ class AMQPMethodFrame(AMQPFrame): self.payload = payload def write_to(self, buf): - AMQPFrame.write_to(self, buf) - self.payload.write_to(buf) + if self.payload.IS_CONTENT_STATIC: + buf.write(struct.pack('!BH', FRAME_METHOD, self.channel)) + buf.write(self.payload.STATIC_CONTENT) + else: + buf.write(struct.pack('!BHL', FRAME_METHOD, self.channel, + 4 + self.payload.get_size())) + buf.write(self.payload.BINARY_HEADER) + self.payload.write_arguments(buf) + buf.write(chr(FRAME_END)) @staticmethod def unserialize(channel, payload_as_buffer): - print('Going to unser a methodframe') clsmet = struct.unpack_from('!HH', payload_as_buffer, 0) - print('Cls:Met=', clsmet) - try: method_payload_class = IDENT_TO_METHOD[clsmet] payload = method_payload_class.from_buffer(payload_as_buffer, 4) - except Exception as e: - print(repr(e)) - raise except KeyError: raise ValueError('Invalid class %s method %s' % clsmet) else: return AMQPMethodFrame(channel, payload) def get_size(self): - # frame header is always 7, frame end is 1, class + method is 4 + # frame_header = (method(1) + channel(2) + length(4) + class(2) + method(2) + payload(N) + frame_end(1)) return 12 + self.payload.get_size() diff --git a/coolamqp/handshake/__init__.py b/coolamqp/handshake/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3c232c073f641ec57b3261890809dd8ff458bf35 --- /dev/null +++ b/coolamqp/handshake/__init__.py @@ -0,0 +1,89 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +""" +Provides reactors that can authenticate an AQMP session +""" + +from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ + ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk +from coolamqp.framing.frames import AMQPMethodFrame + +ST_AWAITING_CONNECTIONSTART = 0 +ST_CONNECTIONSTARTOK_SENT = 1 + +CLIENT_DATA = [ + # because RabbitMQ is some kind of a fascist and does not allow + # these fields to be of type short-string + (b'product', (b'CoolAMQP', b'S')), + (b'version', (b'1.0', b'S')), + (b'copyright', (b'Copyright (C) 2016 DMS Serwis', b'S')), + (b'information', (b'Licensed under the MIT License. See https://github.com/smok-serwis/coolamqp for details', b'S')) + ] + + +class Handshaker(object): + """ + Object that given a connection rolls the handshake + """ + + + def __init__(self, connection, login, password, virtual_host, + on_success, on_fail, heartbeat=0): + """ + :param connection: Connection instance to use + :param login: login to try + :param password: password to try + :param virtual_host: virtual_host to pick + :param on_success: callable/0, on success + :param on_fail: callable/0, on failure + :param heartbeat: heartbeat to requisition + """ + self.connection = connection + self.login = login + self.password = password + self.virtual_host = virtual_host + self.connection.watch_for_method(0, ConnectionStart, self.on_connection_start) + + # Callbacks + self.on_success = on_success + self.on_fail = on_fail + + # Negotiated parameters + self.channel_max = None + self.frame_max = None + self.heartbeat = heartbeat + + def on_connection_start(self, payload): + sasl_mechanisms = payload.mechanisms.split(b' ') + locale_supported = payload.locales.split(b' ') + + # Select a mechanism + if b'PLAIN' not in sasl_mechanisms: + raise ValueError('Server does not support PLAIN') + + self.connection.watch_for_method(0, ConnectionTune, self.on_connection_tune) + self.connection.send([ + AMQPMethodFrame(0, + ConnectionStartOk(CLIENT_DATA, b'PLAIN', + b'\x00' + self.login.encode('utf8') + b'\x00' + self.password.encode( + 'utf8'), + locale_supported[0] + )) + ]) + + def on_connection_tune(self, payload): + print('Channel max: ', payload.channel_max, 'Frame max: ', payload.frame_max, 'Heartbeat: ', payload.heartbeat) + + self.channel_max = payload.channel_max + self.frame_max = payload.frame_max + self.heartbeat = min(payload.heartbeat, self.heartbeat) + + self.connection.watch_for_method(0, ConnectionOpenOk, self.on_connection_open_ok) + self.connection.send([ + AMQPMethodFrame(0, ConnectionTuneOk(self.channel_max, self.frame_max, self.heartbeat)), + AMQPMethodFrame(0, ConnectionOpen(self.virtual_host)) + ]) + + def on_connection_open_ok(self, payload): + print('Connection opened OK!') + self.on_success() diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index a1e76abe932191273e864ea3e29099d1a7ffd94d..8405c1aa77024152f5029d49638fe9624a8ba310 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -1,15 +1,10 @@ # coding=UTF-8 """ -The layer that: - - manages serialization/deserializtion (framing) - - manages low-level data sending (streams) - - sets up connection to AMQP - - reacts and allows sending low-level AMQP commands - -This layer bears no notion of fault tolerance +The layer that allows you to attach Reactors, +ie. objects that are informed upon receiving a frame or connection dying. +They can also send frames themselves. """ from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection import Connection from coolamqp.uplink.listener import ListenerThread -from coolamqp.uplink.reactor import Reactor diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 54f200213f382684fd59dfb31a71ded2e5f03685..7d8a88dd3aa588910e37c710c44d0d1b541f6a7b 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -1,10 +1,12 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function import logging +import collections from coolamqp.uplink.listener import ListenerThread from coolamqp.uplink.connection.recv_framer import ReceivingFramer from coolamqp.uplink.connection.send_framer import SendingFramer +from coolamqp.framing.frames import AMQPMethodFrame logger = logging.getLogger(__name__) @@ -12,32 +14,31 @@ logger = logging.getLogger(__name__) class Connection(object): """ - An object that manages a connection in a comprehensive way - """ + An object that manages a connection in a comprehensive way. - def __init__(self, socketobject, listener_thread, reactor=None): - self.reactor = reactor - if reactor is None: - logger.warn('Creating connection without a reactor; hope you know what you''re doing') - else: - reactor.set_send_frame(self.send) + It allows for sending and registering watches for particular things. + """ + def __init__(self, socketobject, listener_thread): + self.listener_thread = listener_thread + self.socketobject = socketobject self.recvf = ReceivingFramer(self.on_frame) self.failed = False - self.listener_socket = listener_thread.register(socketobject, - on_read=self.recvf.put, - on_fail=self.on_fail) + self.method_watches = {} # channel => [AMQPMethodPayload instance, callback] + def start(self): + """ + Start processing events for this connect + :return: + """ + self.listener_socket = self.listener_thread.register(self.socketobject, + on_read=self.recvf.put, + on_fail=self.on_fail) self.sendf = SendingFramer(self.listener_socket.send) - def set_reactor(self, reactor): - self.reactor = reactor - reactor.set_send_frame(self.send) - def on_fail(self): self.failed = True - self.reactor.on_close() def send(self, frames): """ @@ -51,8 +52,19 @@ class Connection(object): self.failed = True def on_frame(self, frame): - if self.reactor is None: - logger.warn('Received %s but no reactor present. Dropping.', frame) - else: - self.reactor.on_frame(frame) + if isinstance(frame, AMQPMethodFrame): + if frame.channel in self.method_watches: + if isinstance(frame.payload, self.method_watches[frame.channel][0]): + method, callback = self.method_watches[frame.channel].popleft() + callback(frame.payload) + + def watch_for_method(self, channel, method, callback): + """ + :param channel: channel to monitor + :param method: AMQPMethodPayload class + :param callback: callable(AMQPMethodPayload instance) + """ + if channel not in self.method_watches: + self.method_watches[channel] = collections.deque() + self.method_watches[channel].append((method, callback)) \ No newline at end of file diff --git a/coolamqp/uplink/connection/send_framer.py b/coolamqp/uplink/connection/send_framer.py index 3986d16d1d6aa4aa2e2e9fc70adb615bf6de6f2b..bc5c9c8a2c39dd047bf116da03f195decc70b458 100644 --- a/coolamqp/uplink/connection/send_framer.py +++ b/coolamqp/uplink/connection/send_framer.py @@ -43,5 +43,8 @@ class SendingFramer(object): for frame in frames: frame.write_to(buf) + print('Writing ', repr(frame), repr(frame.payload)) - self.on_send(buf.getvalue()) + q = buf.getvalue() + print(repr(q)) + self.on_send(q) diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 205a4901bfb2aa8ce1ad39530bec0dfa8be5a4b0..496aa0667708e0fd2d9236305dc266b976401d43 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -3,6 +3,8 @@ from __future__ import absolute_import, division, print_function import six import logging import select +import monotonic +import heapq from coolamqp.uplink.listener.socket import SocketFailed, BaseSocket @@ -29,6 +31,23 @@ class EpollSocket(BaseSocket): self.listener.epoll.modify(self, self.get_epoll_eventset()) + def oneshot(self, seconds_after, callable): + """ + Set to fire a callable N seconds after + :param seconds_after: seconds after this + :param callable: callable/0 + """ + self.listener.oneshot(self, seconds_after, callable) + + def noshot(self): + """ + Clear all time-delayed callables. + + This will make no time-delayed callables delivered if ran in listener thread + """ + self.listener.noshot(self) + + class EpollListener(object): """ A listener using epoll. @@ -37,8 +56,9 @@ class EpollListener(object): def __init__(self): self.epoll = select.epoll() self.fd_to_sock = {} + self.time_events = [] - def wait(self, timeout=0): + def wait(self, timeout=1): events = self.epoll.poll(timeout=timeout) for fd, event in events: sock = self.fd_to_sock[fd] @@ -55,10 +75,24 @@ class EpollListener(object): self.epoll.unregister(fd) del self.fd_to_sock[fd] sock.on_fail() + self.noshot(sock) sock.close() else: self.epoll.modify(fd, sock.get_epoll_eventset()) + # Timer events + while len(self.time_events) > 0 and (self.time_events[0][0] < monotonic.monotonic()): + ts, fd, callback = heapq.heappop(self.time_events) + callback() + + def noshot(self, sock): + """ + Clear all one-shots for a socket + :param sock: BaseSocket instance + """ + fd = sock.fileno() + self.time_events = [q for q in self.time_events if q[1] != fd] + def shutdown(self): """ Forcibly close all sockets that this manages (calling their on_fail's), @@ -71,6 +105,20 @@ class EpollListener(object): sock.close() self.fd_to_sock = {} self.epoll.close() + self.time_events = [] + + def oneshot(self, sock, delta, callback): + """ + A socket registers a time callback + :param sock: BaseSocket instance + :param delta: "this seconds after now" + :param callback: callable/0 + """ + if sock.fileno() in self.fd_to_sock: + heapq.heappush(self.time_events, (monotonic.monotonic() + delta, + sock.fileno(), + callback + )) def register(self, sock, on_read=lambda data: None, on_fail=lambda: None): diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 8c34971c8a53c1e6293bc326fae5762a9cda7003..5f5639d881365cc3ed9cc3ba5c4b5c3a3d93e72b 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -18,6 +18,7 @@ class BaseSocket(object): """ def __init__(self, sock, on_read=lambda data: None, + on_time=lambda: None, on_fail=lambda: None): """ @@ -25,6 +26,7 @@ class BaseSocket(object): :param on_read: callable(data) to be called when data is read. Listener thread context Raises ValueError on socket should be closed + :param on_time: callable() when time provided by socket expires :param on_fail: callable() when socket is dead and to be discarded. Listener thread context. Socket descriptor will be handled by listener. @@ -35,6 +37,7 @@ class BaseSocket(object): self.data_to_send = collections.deque() self.my_on_read = on_read self.on_fail = on_fail + self.on_time = on_time def send(self, data): """ @@ -44,6 +47,22 @@ class BaseSocket(object): """ raise Exception('Abstract; listener should override that') + def oneshot(self, seconds_after, callable): + """ + Set to fire a callable N seconds after + :param seconds_after: seconds after this + :param callable: callable/0 + """ + raise Exception('Abstract; listener should override that') + + def noshot(self): + """ + Clear all time-delayed callables. + + This will make no time-delayed callables delivered if ran in listener thread + """ + raise Exception('Abstract; listener should override that') + def on_read(self): """Socket is readable, called by Listener""" try: @@ -51,13 +70,14 @@ class BaseSocket(object): except (IOError, socket.error): raise SocketFailed() + print('Got ',repr(data)) + if len(data) == 0: raise SocketFailed() try: self.my_on_read(data) except ValueError: - raise #debug raise SocketFailed() def on_write(self): diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index f9c9572ea4d9961fb72a7a8adc7aa516f5169ad5..40abd2449db5201e4e5b5b3e3fa26b1db94689b7 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -37,4 +37,4 @@ class ListenerThread(threading.Thread): :return: a BaseSocket instance to use instead of this socket """ - return self.listener.register(sock, on_read, on_fail) \ No newline at end of file + return self.listener.register(sock, on_read, on_fail) diff --git a/coolamqp/uplink/reactor.py b/coolamqp/uplink/reactor.py deleted file mode 100644 index ef811420c10ecc49d327be73b54cc3a47c4b6b55..0000000000000000000000000000000000000000 --- a/coolamqp/uplink/reactor.py +++ /dev/null @@ -1,35 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function - - - -class Reactor(object): - """ - Base class for objects that can: - - Receive AMQPFrame's - - Send AMQPFrame's - - Receive information about termination of connection - - Default implementation is a no-op default reactor. - """ - def __init__(self): - self.send_frame = lambda frame: None - - - def on_frame(self, frame): - """ - Frame was received. - :param frame: AMQPFrame instance - """ - - def set_send_frame(self, sender): - """ - Called when Reactor is registered in a Connection - :param sender: callable(amqp_frame) to call if Reactor wants to send a frame - """ - self.send_frame = sender - - def on_close(self): - """ - Called when connection is closed - """ diff --git a/tests/run.py b/tests/run.py new file mode 100644 index 0000000000000000000000000000000000000000..fb842333a3cbb04970f3590a4b9ef1911d688da7 --- /dev/null +++ b/tests/run.py @@ -0,0 +1,29 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +from coolamqp.uplink import ListenerThread, Connection +import socket +import time + + +def newc(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('127.0.0.1', 5672)) + s.settimeout(0) + s.send('AMQP\x00\x00\x09\x01') + return s + + +from coolamqp.handshake import Handshaker + +if __name__ == '__main__': + lt = ListenerThread() + lt.start() + + con = Connection(newc(), lt) + + handshaker = Handshaker(con, 'user', 'user', '/') + con.start() + + time.sleep(5) + + lt.terminate() diff --git a/tests/test_framing/test_definitions/test_frames.py b/tests/test_framing/test_definitions/test_frames.py index 80caa168222ca5c7bec12aa008cc2ea83e578159..c805647442bd32414d4d2c0bc9a412b1fead5f06 100644 --- a/tests/test_framing/test_definitions/test_frames.py +++ b/tests/test_framing/test_definitions/test_frames.py @@ -4,7 +4,7 @@ import unittest import io import struct from coolamqp.framing.frames import AMQPHeaderFrame -from coolamqp.framing.definitions import BasicContentPropertyList, FRAME_HEADER, FRAME_END +from coolamqp.framing.definitions import BasicContentPropertyList, FRAME_HEADER, FRAME_END, ConnectionStartOk class TestShitSerializesRight(unittest.TestCase): @@ -42,3 +42,5 @@ class TestShitSerializesRight(unittest.TestCase): self.assertEquals(buf.getvalue(), ) + + diff --git a/tests/test_framing/test_field_table.py b/tests/test_framing/test_field_table.py new file mode 100644 index 0000000000000000000000000000000000000000..0f3f7ccbb82889b6f48322f10bda69aaa916a499 --- /dev/null +++ b/tests/test_framing/test_field_table.py @@ -0,0 +1,46 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import unittest +import struct +import io + +from coolamqp.framing.field_table import enframe_table, deframe_table, frame_table_size, \ + enframe_field_value, deframe_field_value, frame_field_value_size + + +class TestFramingTables(unittest.TestCase): + def test_frame_unframe_table(self): + + tab = [ + (b'field', (b'yo', b's')) + ] + + buf = io.BytesIO() + + self.assertEquals(frame_table_size(tab), 4+6+4) + + enframe_table(buf, tab) + buf = buf.getvalue() + + self.assertEquals(buf, struct.pack('!I', 10) + b'\x05fields\x02yo') + + tab, delta = deframe_table(buffer(buf), 0) + + self.assertEquals(len(tab), 1) + self.assertEquals(delta, 14) + self.assertEquals(tab[0], (b'field', (b'yo', b's'))) + + def test_frame_unframe_value(self): + + buf = io.BytesIO() + + enframe_field_value(buf, (b'yo', b's')) + + buf = buf.getvalue() + self.assertEquals(b's\x02yo', buf) + + fv, delta = deframe_field_value(buffer(buf), 0) + self.assertEquals(fv, (b'yo', b's')) + self.assertEquals(delta, 4) + + self.assertEquals(frame_field_value_size(fv), 4) diff --git a/tests/test_uplink/test_basic.py b/tests/test_uplink/test_basic.py index 357b57dcee32f08c024f5870c72af6036f53885e..81c3a4cfe2cf441c5258bca5794acadaf105ff5b 100644 --- a/tests/test_uplink/test_basic.py +++ b/tests/test_uplink/test_basic.py @@ -2,8 +2,8 @@ from __future__ import absolute_import, division, print_function import unittest -from coolamqp.framing.definitions import ConnectionStart -from coolamqp.uplink import ListenerThread, Connection, Reactor +from coolamqp.handshake import Handshaker +from coolamqp.uplink import ListenerThread, Connection import socket import time @@ -16,24 +16,22 @@ def newc(): return s -class LolReactor(Reactor): - def __init__(self): - Reactor.__init__(self) - self.got_connectionstart = False - - def on_frame(self, frame): - if isinstance(frame.payload, ConnectionStart): - self.got_connectionstart = True - - class TestBasic(unittest.TestCase): def test_gets_connectionstart(self): + + hnd_ok = {'ok': False} + def hnd_suc(): + hnd_ok['ok'] = True + lt = ListenerThread() lt.start() - r = LolReactor() - con = Connection(newc(), lt, r) + + con = Connection(newc(), lt) + + Handshaker(con, 'user', 'user', '/', hnd_suc, lambda: None) + con.start() time.sleep(5) lt.terminate() - self.assertTrue(r.got_connectionstart) \ No newline at end of file + self.assertTrue(hnd_ok['ok'])