diff --git a/coolamqp/framing/connector.py b/coolamqp/framing/connector.py deleted file mode 100644 index 8dcc9aa51b3475d9988ce6aa545aba1ac3133db1..0000000000000000000000000000000000000000 --- a/coolamqp/framing/connector.py +++ /dev/null @@ -1,12 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function - - -def connect(host, port): - """ - Given a host and port, return a socket - that - :param host: - :param port: - :return: - """ \ No newline at end of file diff --git a/coolamqp/framing/react/__init__.py b/coolamqp/framing/react/__init__.py deleted file mode 100644 index 1ecfda3bb4a648582888bc86871de56daba9fce3..0000000000000000000000000000000000000000 --- a/coolamqp/framing/react/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function - -""" - -""" \ No newline at end of file diff --git a/coolamqp/framing/streams/__init__.py b/coolamqp/framing/streams/__init__.py deleted file mode 100644 index 7670bd2bf36d35609e06e53cd6455f221b39b897..0000000000000000000000000000000000000000 --- a/coolamqp/framing/streams/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -# coding=UTF-8 -""" -Classes that allow to receive and send frames in a rapid way, -and manage low-level connection details. - -These modules bear no notion of fault-tolerance. -""" -from __future__ import absolute_import, division, print_function - -from coolamqp.framing.streams.recv_formatter import ReceivingFormatter - - diff --git a/coolamqp/framing/streams/uplink.py b/coolamqp/framing/streams/uplink.py deleted file mode 100644 index e938afa5fedd781bfc59925b7dc6aa37f223e1c4..0000000000000000000000000000000000000000 --- a/coolamqp/framing/streams/uplink.py +++ /dev/null @@ -1,27 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import collections -import socket - -class Uplink(object): - """ - This coordinates access to a shared socket. - - This should be discarded when the TCP connection dies. - """ - - def __init__(self, sock): - """ - Pass a fresh socket, just a - :param sock: - """ - self.sock = sock - self.sock.settimeout(0) - self.sock.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1) # disable Nagle - - # when a method frame comes in, it is checked here first. - # if there's a match, that means a sync request completed. - self.waiting_queue = collections.defaultdict() - - - diff --git a/coolamqp/framing/__init__.py b/coolamqp/uplink/__init__.py similarity index 100% rename from coolamqp/framing/__init__.py rename to coolamqp/uplink/__init__.py diff --git a/coolamqp/framing/streams/exceptions.py b/coolamqp/uplink/authentication.py similarity index 71% rename from coolamqp/framing/streams/exceptions.py rename to coolamqp/uplink/authentication.py index 0637f9fc106149cfdae552f9577421133e3ead3a..7d799de9fe82a818cfa13db8bc8bc411c8cd7feb 100644 --- a/coolamqp/framing/streams/exceptions.py +++ b/coolamqp/uplink/authentication.py @@ -1,4 +1,4 @@ # coding=UTF-8 +"""All things authentication""" from __future__ import absolute_import, division, print_function - diff --git a/coolamqp/uplink/commander/__init__.py b/coolamqp/uplink/commander/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..129d19e03c828885baf8db66a450977d7282dceb --- /dev/null +++ b/coolamqp/uplink/commander/__init__.py @@ -0,0 +1,13 @@ +# coding=UTF-8 +""" +Classes that control what Uplink does +""" +from __future__ import absolute_import, division, print_function + + +class BaseCommander(object): + """ + + + This is invoked by user + """ \ No newline at end of file diff --git a/coolamqp/uplink/factory.py b/coolamqp/uplink/factory.py new file mode 100644 index 0000000000000000000000000000000000000000..337659bee19e11fbcc2e0d1b9059cadd6ea1511a --- /dev/null +++ b/coolamqp/uplink/factory.py @@ -0,0 +1,44 @@ +# coding=UTF-8 +""" +Set of objects and functions whose job is to construct an Uplink +instance capable of further action and bootstrap. +""" +from __future__ import absolute_import, division, print_function + +from coolamqp.uplink.frames.base import AMQP_HELLO_HEADER + +import socket + + +def connect(host, port, connect_timeout=10, + general_timeout=5): + """ + Return a TCP socket connected to broker. + + Socket should be in the state of 'Awaiting Connection.Start' + + This may block for up to connect_timeout seconds. + + When returned, this socket will be in the state of awaiting + an + + :param host: host to connect to + :type host: text + :param port: port to connect to + :type port: int + :return: a CONNECTED socket or None, if connection failed. + """ + + try: + s = socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(connect_timeout) + s.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1) + s.connect((host, port)) + s.send(AMQP_HELLO_HEADER) + s.settimeout(0) + except (IOError, socket.error, socket.timeout): + try: + s.close() + except: + pass + return None diff --git a/coolamqp/framing/frames/__init__.py b/coolamqp/uplink/frames/__init__.py similarity index 68% rename from coolamqp/framing/frames/__init__.py rename to coolamqp/uplink/frames/__init__.py index 425e564e29a5ebe05404e9edb401d7cffd7e94ee..dd8748f57c4aeca1bb21fce23d0cc98c3fc73d8b 100644 --- a/coolamqp/framing/frames/__init__.py +++ b/coolamqp/uplink/frames/__init__.py @@ -3,7 +3,8 @@ from __future__ import absolute_import, division, print_function """ Definitions of frames. -Mechanisms for serialization/deserialization of AMQP frames. +Mechanisms for serialization/deserialization of AMQP frames and other types. + definitions.py is machine-generated from AMQP specification. """ \ No newline at end of file diff --git a/coolamqp/framing/frames/base.py b/coolamqp/uplink/frames/base.py similarity index 100% rename from coolamqp/framing/frames/base.py rename to coolamqp/uplink/frames/base.py diff --git a/coolamqp/framing/frames/base_definitions.py b/coolamqp/uplink/frames/base_definitions.py similarity index 93% rename from coolamqp/framing/frames/base_definitions.py rename to coolamqp/uplink/frames/base_definitions.py index 0afd83c17c63a9c5e72ae38ac364a54af77b7a70..befb6426c3df00a2dd538377a5b9efd89fca3263 100644 --- a/coolamqp/framing/frames/base_definitions.py +++ b/coolamqp/uplink/frames/base_definitions.py @@ -6,7 +6,7 @@ from __future__ import absolute_import, division, print_function import struct -from coolamqp.framing.frames.base import AMQPPayload +from coolamqp.uplink.frames.base import AMQPPayload @@ -31,7 +31,7 @@ class AMQPMethodPayload(AMQPPayload): Write own content to target buffer - starting from LENGTH, ending on FRAME_END :param buf: target buffer """ - from coolamqp.framing.frames.definitions import FRAME_END + from coolamqp.uplink.frames.definitions import FRAME_END if self.IS_CONTENT_STATIC: buf.write(self.STATIC_CONTENT) diff --git a/coolamqp/framing/frames/compilation/__init__.py b/coolamqp/uplink/frames/compilation/__init__.py similarity index 100% rename from coolamqp/framing/frames/compilation/__init__.py rename to coolamqp/uplink/frames/compilation/__init__.py diff --git a/coolamqp/framing/frames/compilation/compile_definitions.py b/coolamqp/uplink/frames/compilation/compile_definitions.py similarity index 98% rename from coolamqp/framing/frames/compilation/compile_definitions.py rename to coolamqp/uplink/frames/compilation/compile_definitions.py index 430440d115f9e91e635e0c1fab3688a6cb5135ef..56ec91b6e5e60a1d3f693a422c82f086035e7359 100644 --- a/coolamqp/framing/frames/compilation/compile_definitions.py +++ b/coolamqp/uplink/frames/compilation/compile_definitions.py @@ -5,15 +5,15 @@ import struct import six import math -from coolamqp.framing.frames.compilation.utilities import get_constants, get_classes, get_domains, \ +from coolamqp.uplink.frames.compilation.utilities import get_constants, get_classes, get_domains, \ byname, name_class, name_method, name_field, ffmt, doxify, infertype, normname, as_nice_escaped_string, \ frepr -from coolamqp.framing.frames.base import BASIC_TYPES +from coolamqp.uplink.frames.base import BASIC_TYPES TYPE_TRANSLATOR = { 'shortstr': 'binary type (max length 255)', 'longstr': 'binary type', - 'table': 'table. See coolamqp.framing.frames.field_table', + 'table': 'table. See coolamqp.uplink.frames.field_table', 'bit': 'bool', 'octet': 'int, 8 bit unsigned', 'short': 'int, 16 bit unsigned', @@ -22,7 +22,7 @@ TYPE_TRANSLATOR = { 'timestamp': '64 bit signed POSIX timestamp (in seconds)', } -def compile_definitions(xml_file='resources/amqp0-9-1.xml', out_file='coolamqp/framing/frames/definitions.py'): +def compile_definitions(xml_file='resources/amqp0-9-1.xml', out_file='coolamqp/uplink/frames/definitions.py'): """parse resources/amqp-0-9-1.xml into """ xml = ElementTree.parse(xml_file) @@ -34,7 +34,7 @@ from __future__ import print_function, absolute_import A Python version of the AMQP machine-readable specification. Generated automatically by CoolAMQP from AMQP machine-readable specification. -See coolamqp.framing.frames.compilation for the tool +See coolamqp.uplink.frames.compilation for the tool AMQP is copyright (c) 2016 OASIS CoolAMQP is copyright (c) 2016 DMS Serwis s.c. @@ -42,8 +42,8 @@ CoolAMQP is copyright (c) 2016 DMS Serwis s.c. import struct -from coolamqp.framing.frames.base_definitions import AMQPClass, AMQPMethodPayload, AMQPContentPropertyList -from coolamqp.framing.frames.field_table import enframe_table, deframe_table, frame_table_size +from coolamqp.uplink.frames.base_definitions import AMQPClass, AMQPMethodPayload, AMQPContentPropertyList +from coolamqp.uplink.frames.field_table import enframe_table, deframe_table, frame_table_size ''') diff --git a/coolamqp/framing/frames/compilation/utilities.py b/coolamqp/uplink/frames/compilation/utilities.py similarity index 98% rename from coolamqp/framing/frames/compilation/utilities.py rename to coolamqp/uplink/frames/compilation/utilities.py index a0bdd86641969e39b981a55b073e09146137c40b..90c8c22cd48ebcda23e8338c79a15cbda12e552b 100644 --- a/coolamqp/framing/frames/compilation/utilities.py +++ b/coolamqp/uplink/frames/compilation/utilities.py @@ -4,7 +4,7 @@ from collections import namedtuple import six import math -from coolamqp.framing.frames.base import BASIC_TYPES, DYNAMIC_BASIC_TYPES +from coolamqp.uplink.frames.base import BASIC_TYPES, DYNAMIC_BASIC_TYPES # docs may be None diff --git a/coolamqp/framing/frames/definitions.py b/coolamqp/uplink/frames/definitions.py similarity index 99% rename from coolamqp/framing/frames/definitions.py rename to coolamqp/uplink/frames/definitions.py index c9001741a267ca2bf79a8184f91af5205cc3987d..b369497deb3f23328e48fd98efdcbf54cdfda80a 100644 --- a/coolamqp/framing/frames/definitions.py +++ b/coolamqp/uplink/frames/definitions.py @@ -4,7 +4,7 @@ from __future__ import print_function, absolute_import A Python version of the AMQP machine-readable specification. Generated automatically by CoolAMQP from AMQP machine-readable specification. -See coolamqp.framing.frames.compilation for the tool +See coolamqp.uplink.frames.compilation for the tool AMQP is copyright (c) 2016 OASIS CoolAMQP is copyright (c) 2016 DMS Serwis s.c. @@ -12,8 +12,8 @@ CoolAMQP is copyright (c) 2016 DMS Serwis s.c. import struct -from coolamqp.framing.frames.base_definitions import AMQPClass, AMQPMethodPayload, AMQPContentPropertyList -from coolamqp.framing.frames.field_table import enframe_table, deframe_table, frame_table_size +from coolamqp.uplink.frames.base_definitions import AMQPClass, AMQPMethodPayload, AMQPContentPropertyList +from coolamqp.uplink.frames.field_table import enframe_table, deframe_table, frame_table_size # Core constants FRAME_METHOD = 1 @@ -435,7 +435,7 @@ class ConnectionStart(AMQPMethodPayload): "version", giving the name of the server version, "platform", giving the name of the operating system, "copyright", if appropriate, and "information", giving other general information. - :type server_properties: table. See coolamqp.framing.frames.field_table (peer-properties in AMQP) + :type server_properties: table. See coolamqp.uplink.frames.field_table (peer-properties in AMQP) :param mechanisms: Available security mechanisms A list of the security mechanisms that the server supports, delimited by spaces. :type mechanisms: binary type (longstr in AMQP) @@ -588,7 +588,7 @@ class ConnectionStartOk(AMQPMethodPayload): of the client product, "version", giving the name of the client version, "platform", giving the name of the operating system, "copyright", if appropriate, and "information", giving other general information. - :type client_properties: table. See coolamqp.framing.frames.field_table (peer-properties in AMQP) + :type client_properties: table. See coolamqp.uplink.frames.field_table (peer-properties in AMQP) :param mechanism: Selected security mechanism A single security mechanisms selected by the client, which must be one of those specified by the server. @@ -1346,7 +1346,7 @@ class ExchangeDeclare(AMQPMethodPayload): :param arguments: Arguments for declaration A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. - :type arguments: table. See coolamqp.framing.frames.field_table (table in AMQP) + :type arguments: table. See coolamqp.uplink.frames.field_table (table in AMQP) """ self.exchange = exchange self.type = type @@ -1663,7 +1663,7 @@ class QueueBind(AMQPMethodPayload): :param arguments: Arguments for binding A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class. - :type arguments: table. See coolamqp.framing.frames.field_table (table in AMQP) + :type arguments: table. See coolamqp.uplink.frames.field_table (table in AMQP) """ self.queue = queue self.exchange = exchange @@ -1833,7 +1833,7 @@ class QueueDeclare(AMQPMethodPayload): :param arguments: Arguments for declaration A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. - :type arguments: table. See coolamqp.framing.frames.field_table (table in AMQP) + :type arguments: table. See coolamqp.uplink.frames.field_table (table in AMQP) """ self.queue = queue self.passive = passive @@ -2267,7 +2267,7 @@ class QueueUnbind(AMQPMethodPayload): :type routing_key: binary type (max length 255) (shortstr in AMQP) :param arguments: Arguments of binding Specifies the arguments of the binding to unbind. - :type arguments: table. See coolamqp.framing.frames.field_table (table in AMQP) + :type arguments: table. See coolamqp.uplink.frames.field_table (table in AMQP) """ self.queue = queue self.exchange = exchange @@ -2395,7 +2395,7 @@ class BasicContentPropertyList(AMQPContentPropertyList): :param content_encoding: MIME content encoding :type content_encoding: binary type (max length 255) (shortstr in AMQP) :param headers: message header field table - :type headers: table. See coolamqp.framing.frames.field_table (table in AMQP) + :type headers: table. See coolamqp.uplink.frames.field_table (table in AMQP) :param delivery_mode: non-persistent (1) or persistent (2) :type delivery_mode: int, 8 bit unsigned (octet in AMQP) :param priority: message priority, 0 to 9 @@ -2633,7 +2633,7 @@ class BasicConsume(AMQPMethodPayload): :param arguments: Arguments for declaration A set of arguments for the consume. The syntax and semantics of these arguments depends on the server implementation. - :type arguments: table. See coolamqp.framing.frames.field_table (table in AMQP) + :type arguments: table. See coolamqp.uplink.frames.field_table (table in AMQP) """ self.queue = queue self.consumer_tag = consumer_tag diff --git a/coolamqp/framing/frames/field_table.py b/coolamqp/uplink/frames/field_table.py similarity index 100% rename from coolamqp/framing/frames/field_table.py rename to coolamqp/uplink/frames/field_table.py diff --git a/coolamqp/framing/frames/frames.py b/coolamqp/uplink/frames/frames.py similarity index 94% rename from coolamqp/framing/frames/frames.py rename to coolamqp/uplink/frames/frames.py index 7e51b7b8bcd6eebcfaa18a06c8474cf54d41c68e..84a74c8e287917dc43c78d8956a8bb3567e8f529 100644 --- a/coolamqp/framing/frames/frames.py +++ b/coolamqp/uplink/frames/frames.py @@ -6,8 +6,8 @@ from __future__ import absolute_import, division, print_function import struct -from coolamqp.framing.frames.base import AMQPFrame -from coolamqp.framing.frames.definitions import FRAME_METHOD, FRAME_HEARTBEAT, FRAME_BODY, FRAME_HEADER, FRAME_END, \ +from coolamqp.uplink.frames.base import AMQPFrame +from coolamqp.uplink.frames.definitions import FRAME_METHOD, FRAME_HEARTBEAT, FRAME_BODY, FRAME_HEADER, FRAME_END, \ IDENT_TO_METHOD diff --git a/coolamqp/framing/order.py b/coolamqp/uplink/order.py similarity index 100% rename from coolamqp/framing/order.py rename to coolamqp/uplink/order.py diff --git a/coolamqp/uplink/streams/__init__.py b/coolamqp/uplink/streams/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..772cb445da18001cf14a4324fe9cb776f6b2e966 --- /dev/null +++ b/coolamqp/uplink/streams/__init__.py @@ -0,0 +1,9 @@ +# coding=UTF-8 +""" +Classes that allow to receive and send frames +REASONABLY FAST, because they use buffers and stuff. +""" +from __future__ import absolute_import, division, print_function + +from coolamqp.uplink.streams.recv_formatter import ReceivingFormatter +from coolamqp.uplink.streams.send_operator import SendingOperator diff --git a/coolamqp/uplink/streams/reader_thread.py b/coolamqp/uplink/streams/reader_thread.py new file mode 100644 index 0000000000000000000000000000000000000000..f5870f34d5811395669926f2ec79486a61bef122 --- /dev/null +++ b/coolamqp/uplink/streams/reader_thread.py @@ -0,0 +1,34 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import socket +import threading + + +class ReaderThread(threading.Thread): + """ + A thread, whose job is to receive AMQP frames from AMQP TCP socket. + + Thread may inform Uplink of socket's lossage via on_socket_failed(exception). It should exit afterwards at once. + """ + + def __init__(self, sock, on_failure): + """ + :param uplink: Uplink instance + :param sock: a socket to use + """ + threading.Thread.__init__(self) + self.daemon = True + + self.uplink = uplink + self.sock = sock + self.is_cancelled = False + + + def on_cancel(self): + """ + Called by Uplink when it decides that we should not report any more frames, and should just die. + """ + self.is_cancelled = True + + + def run(self):_ \ No newline at end of file diff --git a/coolamqp/framing/streams/recv_formatter.py b/coolamqp/uplink/streams/recv_formatter.py similarity index 93% rename from coolamqp/framing/streams/recv_formatter.py rename to coolamqp/uplink/streams/recv_formatter.py index 5a573b3960b88c80e7cd945a1fb3abfaecc14ba3..528cfcb0790ef68e2c07d24c4e6117c6d3db5462 100644 --- a/coolamqp/framing/streams/recv_formatter.py +++ b/coolamqp/uplink/streams/recv_formatter.py @@ -6,9 +6,9 @@ import six import collections import socket -from coolamqp.framing.frames.definitions import FRAME_HEADER, FRAME_HEARTBEAT, FRAME_END, FRAME_METHOD, FRAME_BODY -from coolamqp.framing.frames.frames import AMQPBodyFrame, AMQPHeaderFrame, AMQPHeartbeatFrame, AMQPMethodFrame -from coolamqp.framing.streams.exceptions import InvalidDataError +from coolamqp.uplink.frames.definitions import FRAME_HEADER, FRAME_HEARTBEAT, FRAME_END, FRAME_METHOD, FRAME_BODY +from coolamqp.uplink.frames.frames import AMQPBodyFrame, AMQPHeaderFrame, AMQPHeartbeatFrame, AMQPMethodFrame +from coolamqp.uplink.streams.exceptions import InvalidDataError FRAME_TYPES = { diff --git a/coolamqp/framing/streams/send_operator.py b/coolamqp/uplink/streams/send_operator.py similarity index 99% rename from coolamqp/framing/streams/send_operator.py rename to coolamqp/uplink/streams/send_operator.py index 1161ee429028ba1315acd105709c9119d87cc22b..b76a0f6d57930da25413ab8dab3e15b95c72e2a1 100644 --- a/coolamqp/framing/streams/send_operator.py +++ b/coolamqp/uplink/streams/send_operator.py @@ -7,7 +7,6 @@ import io import socket - class SendingOperator(object): """ Assembles AMQP frames from received data and orchestrates their upload via a socket. @@ -43,7 +42,6 @@ class SendingOperator(object): self.failed = False - def _failed(self, e): """Discard all to_send, run on_fail callables""" self.failed = True diff --git a/coolamqp/uplink/streams/watchman.py b/coolamqp/uplink/streams/watchman.py new file mode 100644 index 0000000000000000000000000000000000000000..e244fe45cf8f09f8a2672083b4562958170d34d3 --- /dev/null +++ b/coolamqp/uplink/streams/watchman.py @@ -0,0 +1,84 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import collections +from six.moves import queue + + +class Watchman(object): + """ + Watchman is a guy that you can: + + - If you are ReaderThread, ask "hey, this frame just arrived, + is someone interested in it?" + - If you are frontend thread, can ask watchman to trigger a callback + when a particular frame arrives (or a bunch of them, if your request + expects a bunch). + + Since Watchman receives all frames from ReaderThread, it also knows about: + - channels being opened + - channels being closed by exception + - + + Watchman is also being informed about new channels being opened + + See page 19 of the AMQP specification. Responses for synchronous methods + arrive in order, ie. if I requested a queue.declare, and then an + exchange.declare, I expect that responses will arrive in order of + queue.declare-ok, exchange.declare-ok. + + This is important, because if we have a message, we need to check only + the first watch. + + :FRAMES = [AMQPMethodPayloadClass1, AMQPMethodPayloadClass2, ...] + :ONFRAME = callable(AMQPMethodPayload instance) + :ONDEAD = callable() | None + + :WATCH = ( :FRAMES :ONFRAME :ONDEAD ) + :WATCH_TO_LOAD = ( channel :FRAMES :ONFRAME :ONDEAD) + + + !!!!! LIMITED RIGHT NOW ONLY TO METHOD FRAMES + """ + def __init__(self): + self.watches_to_load = queue.Queue() # put new watches here + + self.watches = {} # channel => list of :WATCH + + + + def _analyze_watches(self): + + def on_frame(self, frame): + """ + A frame arrived. If this triggers a watch, trigger it and remove. + All frames received by ReaderThread go thru here. + + TO BE CALLED BY READER THREAD + + :param frame: AMQPFrame of any subtype + """ + + # Analyze pending watches + while self.watches_to_load.qsize() > 0: + channel, + + + def set_watch(self, channel, frame_types, on_frame, on_dead=None): + """ + Set a watch. Watch will fire if I see a method frame of type + found in the iterable of frame_types. + + TO BE CALLED BY FRONTEND THREAD. + + :param channel: channel to set watch on + :param frame_types: list of AMQPMethodPayload classes + :param on_frame: callable(AMQPMethodPayload instance) + :param on_dead: callable/0 to call if this watch will + for sure not be processed (eg. channel or connection died) + """ + self.watches_to_load.put((channel, frame_types, on_frame, on_dead)) + if channel not in self.watches: + p = collections.deque((frame_types, on_frame, on_dead)) + self.watches[channel] = p + else: + self.watches[channel].put diff --git a/coolamqp/uplink/uplink.py b/coolamqp/uplink/uplink.py new file mode 100644 index 0000000000000000000000000000000000000000..905e5f43e028c4477ec65728caa6cd1b4930c1cb --- /dev/null +++ b/coolamqp/uplink/uplink.py @@ -0,0 +1,58 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import collections +from six.moves import queue + + +from coolamqp.uplink.frames.base_definitions import AMQPMethodPayload + + +class Uplink(object): + """ + Uplink, the frame relay manager. + + It coordinates the joint effort of all the classes in this module. + + Uplink brings a thread to life - reader_thread - those job is to recv() on socket. + + + + There are two principal threads that can call Uplink. + 1) CoolAMQP frontend thread: this is the one processing events in the system. This thread + is the only one allowed to send(), and will frequently block on it. + + Well, you can't process more tasks if you are hanging on send, are you. + + 2) Reader thread, spawned and managed by Uplink. This is the thread running recv() on the socket, + calling the callbacks that you register, and so on. + + """ + + def __init__(self): + # Watchers + # Watcher is a one-time trigger set on one (or more) method frames. + self.watchers_to_register = queue.Queue() + pass + + + + def listen_for_method_frame(self, frames, on_frame=lambda f: None, on_dead=None): + """ + Register a one-time listener for a particular method frame (or may kinds of frame). + + When one matching frame arrives, on_frame will be called and rest of subscriptions will be evicted. + + The callback will be executed in Uplink's internal thread context. + + :param frames: + :param on_frame: + :param on_dead: + :return: + """ + + def + + + + def on_socket_failed(self, exc): + """Called by ReaderThread when it detects that socket has failed""" \ No newline at end of file diff --git a/setup.py b/setup.py index 220c7eaba287e8f1e68975ed38c3ab6214cab6b9..03c8c95d3313c5abd6860e31f509865ee9cecfa0 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,11 @@ setup(name='CoolAMQP', packages=[ 'coolamqp', 'coolamqp.backends', - 'coolamqp.framing', + 'coolamqp.uplink', + 'coolamqp.uplink.frames', + 'coolamqp.uplink.frames.compilation', + 'coolamqp.uplink.streams', + ], license='MIT License', long_description=u'The AMQP client that handles reconnection madness for you',