diff --git a/.travis.yml b/.travis.yml index 7e506eb4312d9b9b2d1c155899339da3f8eb5493..f42755e6d4c3df6cdf5b16c155f6b7a2bfd033c1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,10 +13,12 @@ before_script: - curl -L https://codeclimate.com/downloads/test-reporter/test-reporter-latest-linux-amd64 > ./cc-test-reporter - chmod +x ./cc-test-reporter - ./cc-test-reporter before-build + - pip install nose2 coverage script: - - python -m compile_definitions - - python setup.py test - - python -m stress_tests + - coverage run -m compile_definitions + - coverage run --append -m nose2 -vv + - COOLAMQP_FORCE_SELECT_LISTENER=1 coverage run --append -m nose2 -vv + - coverage run --append -m stress_tests install: - python setup.py install - pip install -r stress_tests/requirements.txt diff --git a/CHANGELOG.md b/CHANGELOG.md index a00c02e5499702a36040a028957355c6253b932d..bff2952a61afe946a62cb69ff2b83ca0a449b311 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # v1.1.1 -* _TBA_ +* added `SelectListener`, which means CoolAMQP can + run now on Windows and in gevent environments # v1.1 diff --git a/README.md b/README.md index 48156d738288e67e918e729fd4e0eae73545acd7..112945235133f06a9518ddd9c27a0259f9fb08af 100644 --- a/README.md +++ b/README.md @@ -46,11 +46,13 @@ _Watch out for memoryviews!_ They're here to stay. Assertions are sprinkled throughout the code. You may wish to run with optimizations enabled if you need every CPU cycle you can get. +Note that if you define the environment variable of `COOLAMQP_FORCE_SELECT_LISTENER`, +CoolAMQP will use select-based networking instead of epoll based. + ## Current limitations * channel flow mechanism is not supported (#11) * _confirm=True_ is not available if you're not RabbitMQ (#8) -* no Windows support (#9) ## Copyright holder change diff --git a/compile_definitions/xml_fields.py b/compile_definitions/xml_fields.py index c3c6808fd1670b96189cbc0384bbcbef71f23db5..48be6c68c8c5e91e70de4dff3592d76198cc789b 100644 --- a/compile_definitions/xml_fields.py +++ b/compile_definitions/xml_fields.py @@ -32,7 +32,7 @@ class BaseField(object): self.field_name = field_name def find(self, elem): - raise NotImplementedError('abstract') + raise NotImplementedError('Abstract method!') class ComputedField(BaseField): diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 7d663af291c59eb43d683d057ec910fbcc9e10c8..d1d9708c88843edb0f0e12c4c073c083d06e429e 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,2 +1,2 @@ # coding=UTF-8 -__version__ = '1.1.1_a2' +__version__ = '1.1.1' diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 3338e0a30b1cfca0bfd04a363dd36e49299b709a..b22bb479ed7e1597bee406e3b2524593db5ee55e 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -199,10 +199,10 @@ class Channeler(Attache): assert self.channel_id is not None return self.connection.watch_for_method(self.channel_id, method, callback, on_fail=on_fail) - def method_and_watch(self, method_payload, method_classes_to_watch, - callable): - # type: (coolamqp.framing.base.AMQPMethodPayload, - # tp.Iterable[type], tp.Callable[[coolamqp.framing.base.AMQPMethodPayload], None]) -> None + def method_and_watch(self, method_payload, # type: coolamqp.framing.base.AMQPMethodPayload, + method_classes_to_watch, # type: tp.Iterable[tp.Type[AMQPMethodPayload]] + callable # type: tp.Callable[[AMQPMethodPayload], None] + ): # type: () -> None """ Syntactic sugar for diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 09dd41ccff0b2d037ed722302bc9d13c7803e134..a93b5d9bae4a5620de2e62e46c3acacbdb2cbcba 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -224,7 +224,7 @@ class Publisher(Channeler, Synchronized): assert isinstance(xchg, (six.binary_type, six.text_type)) self._pub(msg, xchg, rk, parent_span, span_enqueued, dont_close_span=True) - def _on_cnpub_delivery(self, payload): + def _on_cnpub_delivery(self, payload): # type: (AMQPMethodPayload) -> None """ This gets called on BasicAck and BasicNack, if mode is MODE_CNPUB """ @@ -293,7 +293,7 @@ class Publisher(Channeler, Synchronized): else: raise Exception(u'Invalid mode') - def on_operational(self, operational): + def on_operational(self, operational): # type: (bool) -> None state = {True: u'up', False: u'down'}[operational] mode = \ {Publisher.MODE_NOACK: u'noack', Publisher.MODE_CNPUB: u'cnpub'}[ diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 8834fb0557edf61a3010e0f7900f0ebd03462668..551c21001f77b8890b5624ab3f616071b5f013a6 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -10,7 +10,7 @@ import typing as tp import warnings from concurrent.futures import Future -import monotonic +from coolamqp.utils import monotonic import six from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer @@ -305,8 +305,8 @@ class Cluster(object): if wait: # this is only going to take a short amount of time, so we're fine with polling - start_at = monotonic.monotonic() - while not self.connected and monotonic.monotonic() - start_at < timeout: + start_at = monotonic() + while not self.connected and monotonic() - start_at < timeout: time.sleep(0.1) if not self.connected: raise ConnectionDead( diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index 5757f7d20d1b216af9cb164f66cfc45353236743..8d3b883213744c27d4c5839b8b9727d9483a75ff 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -2,6 +2,7 @@ from __future__ import print_function, absolute_import, division import logging +import typing as tp from coolamqp.framing.definitions import ConnectionUnblocked, ConnectionBlocked from coolamqp.objects import Callable @@ -31,6 +32,7 @@ class SingleNodeReconnector(object): self.name = name or 'CoolAMQP' self.terminating = False + self.timeout = None self.on_fail = Callable() #: public self.on_blocked = Callable() #: public @@ -39,9 +41,12 @@ class SingleNodeReconnector(object): def is_connected(self): # type: () -> bool return self.connection is not None - def connect(self, timeout): # type: (float) -> None + def connect(self, timeout=None): # type: (tp.Optional[float]) -> None assert self.connection is None + timeout = timeout or self.timeout + self.timeout = timeout + # Initiate connecting - this order is very important! self.connection = Connection(self.node_def, self.listener_thread, extra_properties=self.extra_properties, diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 5a7e0f0c81519cdb5916a5de63da1422dc284618..f17ebee29a9b49b3f23218d7fcf2a744b6cfe596 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -8,7 +8,7 @@ import time import typing as tp import uuid -import monotonic +from coolamqp.utils import monotonic from coolamqp.exceptions import ConnectionDead from coolamqp.framing.base import AMQPMethodPayload @@ -151,7 +151,7 @@ class Connection(object): while len(self.callables_on_connected) > 0: self.callables_on_connected.pop()() - def start(self, timeout): + def start(self, timeout=None): # type: (tp.Optional[float]) -> None """ Start processing events for this connect. Create the socket, transmit 'AMQP\x00\x00\x09\x01' and roll. @@ -160,15 +160,16 @@ class Connection(object): """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - start_at = monotonic.monotonic() + start_at = monotonic() while True: try: sock.connect( (self.node_definition.host, self.node_definition.port)) except socket.error as e: time.sleep(0.5) # Connection refused? Very bad things? - if monotonic.monotonic() - start_at > timeout: - raise ConnectionDead() + if timeout is not None: + if monotonic() - start_at > timeout: + raise ConnectionDead() else: break @@ -260,7 +261,7 @@ class Connection(object): """ if self.log_frames is not None: for frame in frames: - self.log_frames.on_frame(time.monotonic(), frame, 'to_server') + self.log_frames.on_frame(monotonic(), frame, 'to_server') if frames is not None: self.sendf.send(frames, priority=priority) @@ -280,7 +281,7 @@ class Connection(object): :param frame: AMQPFrame that was received """ if self.log_frames is not None: - self.log_frames.on_frame(time.monotonic(), frame, 'to_client') + self.log_frames.on_frame(monotonic(), frame, 'to_client') watch_handled = False # True if ANY watch handled this diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index eb5a87c5a84facfc0ec7f3261a7e63e9f1b200b7..162d555b92bceeb4f11ebfca982a98490b44de68 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -21,9 +21,8 @@ CONNECTION_BLOCKED = b'connection.blocked' SUPPORTED_EXTENSIONS = [ PUBLISHER_CONFIRMS, - CONSUMER_CANCEL_NOTIFY, + CONSUMER_CANCEL_NOTIFY, # half assed support - we just .cancel the consumer, see #12 CONNECTION_BLOCKED - # half assed support - we just .cancel the consumer, see #12 ] CLIENT_DATA = [ @@ -105,12 +104,12 @@ class Handshaker(object): self.connection.watchdog(WATCHDOG_TIMEOUT, self.on_watchdog) self.connection.watch_for_method(0, ConnectionTune, self.on_connection_tune) - global CLIENT_DATA - CLIENT_DATA = copy.copy(CLIENT_DATA) - CLIENT_DATA.extend(self.EXTRA_PROPERTIES) + + CLIENT_DATA_c = copy.copy(CLIENT_DATA) + CLIENT_DATA_c.extend(self.EXTRA_PROPERTIES) self.connection.send([ AMQPMethodFrame(0, - ConnectionStartOk(CLIENT_DATA, b'PLAIN', + ConnectionStartOk(CLIENT_DATA_c, b'PLAIN', b'\x00' + self.login + b'\x00' + self.password, locale_supported[0] )) diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py index fcc3bae6cb95ae7eae415e733cf8eff0d56d1f98..89623d59b432171541d592ecce02ee558e054070 100644 --- a/coolamqp/uplink/heartbeat.py +++ b/coolamqp/uplink/heartbeat.py @@ -3,7 +3,7 @@ from __future__ import absolute_import, division, print_function import typing as tp -import monotonic +from coolamqp.utils import monotonic from coolamqp.framing.frames import AMQPHeartbeatFrame from coolamqp.uplink.connection.watches import AnyWatch @@ -20,13 +20,13 @@ class Heartbeater(object): self.connection = connection self.heartbeat_interval = heartbeat_interval - self.last_heartbeat_on = monotonic.monotonic() # last heartbeat from server + self.last_heartbeat_on = monotonic() # last heartbeat from server self.connection.watchdog(self.heartbeat_interval, self.on_timer) self.connection.watch(AnyWatch(self.on_heartbeat)) def on_heartbeat(self, frame): - self.last_heartbeat_on = monotonic.monotonic() + self.last_heartbeat_on = monotonic() def on_any_frame(self): """ @@ -41,14 +41,14 @@ class Heartbeater(object): Anyway, we should register an all-watch for this. """ - self.last_heartbeat_on = monotonic.monotonic() + self.last_heartbeat_on = monotonic() def on_timer(self): """Timer says we should send a heartbeat""" self.connection.send([AMQPHeartbeatFrame()], priority=True) if ( - monotonic.monotonic() - self.last_heartbeat_on) > 2 * self.heartbeat_interval: + monotonic() - self.last_heartbeat_on) > 2 * self.heartbeat_interval: # closing because of heartbeat self.connection.send(None) diff --git a/coolamqp/uplink/listener/base_listener.py b/coolamqp/uplink/listener/base_listener.py new file mode 100644 index 0000000000000000000000000000000000000000..e48dc5f7a9d980a749bfcbd5caa452d5c03ae872 --- /dev/null +++ b/coolamqp/uplink/listener/base_listener.py @@ -0,0 +1,100 @@ +from abc import ABCMeta, abstractmethod +import heapq +import typing as tp +import six +from coolamqp.utils import monotonic + + +class BaseListener(object): + __metaclass__ = ABCMeta + + def __init__(self): + self.fd_to_sock = {} # type: tp.Dict[int, BaseSocket] + self.time_events = [] # type: tp.List[tp.Tuple[float, int, tp.Callable[[], None]]] + + def do_timer_events(self): + # Timer events + mono = monotonic() + while len(self.time_events) > 0 and (self.time_events[0][0] < mono): + ts, fd, callback = heapq.heappop(self.time_events) + callback() + + def oneshot(self, sock, delta, callback): + """ + A socket registers a time callback + :param sock: BaseSocket instance + :param delta: "this seconds after now" + :param callback: callable/0 + """ + if sock.fileno() in self.fd_to_sock: + heapq.heappush(self.time_events, (monotonic() + delta, + sock.fileno(), + callback + )) + + def noshot(self, sock): # type: (BaseSocket) -> None + """ + Clear all one-shots for a socket + :param sock: BaseSocket instance + """ + fd = sock.fileno() + self.time_events = [q for q in self.time_events if q[1] != fd] + + @abstractmethod + def wait(self, timeout=1): + """ + This will be executed in a loop. + + This must call .do_timer_events() + """ + + def close_socket(self, sock): # type: (BaseSocket) -> None + del self.fd_to_sock[sock.fileno()] + sock.on_fail() + self.noshot(sock) + sock.close() + + def shutdown(self): + """ + Forcibly close all sockets that this manages (calling their on_fail's), + and close the object. + + This object is unusable after this call. + """ + self.time_events = [] + for sock in list(six.itervalues(self.fd_to_sock)): + sock.on_fail() + sock.close() + + self.fd_to_sock = {} + + def oneshot(self, sock, delta, callback): + """ + A socket registers a time callback + :param sock: BaseSocket instance + :param delta: "this seconds after now" + :param callback: callable/0 + """ + if sock.fileno() in self.fd_to_sock: + heapq.heappush(self.time_events, (monotonic() + delta, + sock.fileno(), + callback + )) + + def activate(self, sock): # type: (BaseSocket) -> None + self.fd_to_sock[sock.fileno()] = sock + + @abstractmethod + def register(self, sock, # type: socket.socket + on_read=lambda data: None, # type: tp.Callable[[bytearray], None] + on_fail=lambda: None # type: tp.Callable[[], None] + ): # type: () -> BaseSocket + """ + This has to return a particular Socket instance, adapted to the needs of the listener. + + :param sock: a socket instance (as returned by socket module) + :param on_read: callable(data) to be called with received data + :param on_fail: callable() to be called when socket fails + + :return: a BaseSocket's subclass instance to use instead of this socket + """ diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 6258fe5793f51ecf79ca0f1f04dad5ca5b1c8055..7e38a02ff9dcef2410c019589d3539a276b53aae 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -1,16 +1,16 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import collections -import heapq import logging import select import socket +import threading -import monotonic import six from coolamqp.uplink.listener.socket import SocketFailed, BaseSocket +from coolamqp.uplink.listener.base_listener import BaseListener + logger = logging.getLogger(__name__) @@ -19,18 +19,6 @@ RW = RO | select.EPOLLOUT class EpollSocket(BaseSocket): - """ - EpollListener substitutes your BaseSockets with this - :type sock: socket.socket - :type on_read: tp.Callable[[bytes], None] - :type on_fail: tp.Callable[[], None] - :type listener: coolamqp.uplink.listener.ListenerThread - """ - - def __init__(self, sock, on_read, on_fail, listener): - BaseSocket.__init__(self, sock, on_read=on_read, on_fail=on_fail) - self.listener = listener - self.priority_queue = collections.deque() def send(self, data, priority=False): """ @@ -43,47 +31,28 @@ class EpollSocket(BaseSocket): # silence. If there are errors, it's gonna get nuked soon. pass - def oneshot(self, seconds_after, callable): - """ - Set to fire a callable N seconds after - :param seconds_after: seconds after this - :param callable: callable/0 - """ - self.listener.oneshot(self, seconds_after, callable) - - def noshot(self): - """ - Clear all time-delayed callables. - - This will make no time-delayed callables delivered if ran in listener thread - """ - self.listener.noshot(self) - -class EpollListener(object): +class EpollListener(BaseListener): """ A listener using epoll. """ def __init__(self): self.epoll = select.epoll() - self.fd_to_sock = {} - self.time_events = [] + self.socket_activation_lock = threading.Lock() self.sockets_to_activate = [] + super(EpollListener, self).__init__() def wait(self, timeout=1): - for socket_to_activate in self.sockets_to_activate: - logger.debug('Activating fd %s', (socket_to_activate.fileno(),)) - self.epoll.register(socket_to_activate.fileno(), RW) - self.sockets_to_activate = [] + with self.socket_activation_lock: + for socket_to_activate in self.sockets_to_activate: + logger.debug('Activating fd %s', (socket_to_activate.fileno(),)) + self.epoll.register(socket_to_activate.fileno(), RW) + self.sockets_to_activate = [] events = self.epoll.poll(timeout=timeout) - # Timer events - mono = monotonic.monotonic() - while len(self.time_events) > 0 and (self.time_events[0][0] < mono): - ts, fd, callback = heapq.heappop(self.time_events) - callback() + self.do_timer_events() for fd, event in events: sock = self.fd_to_sock[fd] @@ -98,7 +67,6 @@ class EpollListener(object): sock.on_read() if event & select.EPOLLOUT: - sock.on_write() # I'm done with sending for now if len(sock.data_to_send) == 0 and len( @@ -107,24 +75,16 @@ class EpollListener(object): except SocketFailed as e: logger.debug('Socket %s has raised %s', fd, e) - self.epoll.unregister(fd) - del self.fd_to_sock[fd] - sock.on_fail() - self.noshot(sock) - sock.close() + self.close_socket(sock) # Do any of the sockets want to send data Re-register them - for socket in self.fd_to_sock.values(): - if socket.wants_to_send_data(): - self.epoll.modify(socket.fileno(), RW) + for sock in six.itervalues(self.fd_to_sock): + if sock.wants_to_send_data(): + self.epoll.modify(sock.fileno(), RW) - def noshot(self, sock): - """ - Clear all one-shots for a socket - :param sock: BaseSocket instance - """ - fd = sock.fileno() - self.time_events = [q for q in self.time_events if q[1] != fd] + def close_socket(self, sock): # type: (BaseSocket) -> None + self.epoll.unregister(sock.fileno()) + super(EpollListener, self).close_socket(sock) def shutdown(self): """ @@ -133,42 +93,25 @@ class EpollListener(object): This object is unusable after this call. """ - self.time_events = [] - for sock in list(six.itervalues(self.fd_to_sock)): - sock.on_fail() - sock.close() - - self.fd_to_sock = {} + super(EpollListener, self).shutdown() self.epoll.close() - def oneshot(self, sock, delta, callback): - """ - A socket registers a time callback - :param sock: BaseSocket instance - :param delta: "this seconds after now" - :param callback: callable/0 - """ - if sock.fileno() in self.fd_to_sock: - heapq.heappush(self.time_events, (monotonic.monotonic() + delta, - sock.fileno(), - callback - )) - - def activate(self, sock): # type: (coolamqp.uplink.listener.epoll_listener.EpollSocket) -> None - self.sockets_to_activate.append(sock) + def activate(self, sock): # type: (BaseSocket) -> None + super(EpollListener, self).activate(sock) + with self.socket_activation_lock: + self.sockets_to_activate.append(sock) def register(self, sock, on_read=lambda data: None, on_fail=lambda: None): """ Add a socket to be listened for by the loop. + Please note that .activate() will be later called on this socket. + :param sock: a socket instance (as returned by socket module) :param on_read: callable(data) to be called with received data :param on_fail: callable() to be called when socket fails :return: a BaseSocket instance to use instead of this socket """ - sock = EpollSocket(sock, on_read, on_fail, self) - self.fd_to_sock[sock.fileno()] = sock - - return sock + return EpollSocket(sock, on_read, on_fail=on_fail, listener=self) diff --git a/coolamqp/uplink/listener/select_listener.py b/coolamqp/uplink/listener/select_listener.py new file mode 100644 index 0000000000000000000000000000000000000000..75719ba6c2b161ece5d256fa208785b8252d4dab --- /dev/null +++ b/coolamqp/uplink/listener/select_listener.py @@ -0,0 +1,75 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + +import logging +import select +import socket + +import six + +from coolamqp.uplink.listener.socket import SocketFailed, BaseSocket +from coolamqp.uplink.listener.base_listener import BaseListener + +logger = logging.getLogger(__name__) + +RO = select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR +RW = RO | select.EPOLLOUT + + +class SelectListener(BaseListener): + """ + A listener using select + """ + + def wait(self, timeout=0.5): + rds_and_exs = [] # waiting both for read and for exception + wrs = [] # waiting for write + for sock in six.itervalues(self.fd_to_sock): + rds_and_exs.append(sock) + if sock.wants_to_send_data(): + wrs.append(sock) + + self.do_timer_events() + + try: + rds, wrs, exs = select.select(rds_and_exs, wrs, rds_and_exs, timeout) + except (select.error, socket.error, IOError): + for sock in rds_and_exs: + try: + select.select([sock], [], [], timeout=0) + except (select.error, socket.error, IOError): + self.close_socket(sock) + return + else: + return + + for sock_rd in rds: + try: + sock_rd.on_read() + except SocketFailed: + return self.close_socket(sock_rd) + + for sock_wr in wrs: + try: + sock_wr.on_write() + except SocketFailed: + return self.close_socket(sock_wr) + + for sock_ex in exs: + try: + sock_rd.on_read() + except SocketFailed: + return self.close_socket(sock_ex) + + def register(self, sock, on_read=lambda data: None, + on_fail=lambda: None): + """ + Add a socket to be listened for by the loop. + + :param sock: a socket instance (as returned by socket module) + :param on_read: callable(data) to be called with received data + :param on_fail: callable() to be called when socket fails + + :return: a BaseSocket instance to use instead of this socket + """ + return BaseSocket(sock, on_read, on_fail=on_fail, listener=self) diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index f64bc3ffa67747cb6f402f6291d43fafdef2efb1..d149e1726e26d0859942240bdb22161294b5f18c 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, division, print_function import collections import logging +from abc import ABCMeta, abstractmethod import socket logger = logging.getLogger(__name__) @@ -20,10 +21,12 @@ class BaseSocket(object): To be instantiated only by Listeners. """ + __metaclass__ = ABCMeta def __init__(self, sock, on_read=lambda data: None, on_time=lambda: None, - on_fail=lambda: None): + on_fail=lambda: None, + listener=None): """ :param sock: socketobject @@ -35,6 +38,7 @@ class BaseSocket(object): Listener thread context. Socket descriptor will be handled by listener. This should not + :param listener: listener that registered this socket """ assert sock is not None self.sock = sock @@ -44,6 +48,7 @@ class BaseSocket(object): self._on_fail = on_fail self.on_time = on_time self.is_failed = False + self.listener = listener def on_fail(self): self.is_failed = True @@ -76,7 +81,7 @@ class BaseSocket(object): :param seconds_after: seconds after this :param callable: callable/0 """ - raise Exception('Abstract; listener should override that') + self.listener.oneshot(self, seconds_after, callable) def noshot(self): """ @@ -84,11 +89,12 @@ class BaseSocket(object): This will make no time-delayed callables delivered if ran in listener thread """ - raise Exception('Abstract; listener should override that') + self.listener.noshot(self) - def on_read(self): + def on_read(self): # type: () -> None """Socket is readable, called by Listener""" - if self.is_failed: return + if self.is_failed: + return try: data = self.sock.recv(2048) except (IOError, socket.error) as e: @@ -105,13 +111,16 @@ class BaseSocket(object): def wants_to_send_data(self): # type: () -> bool return not (len(self.data_to_send) == 0 and len(self.priority_queue) == 0) - def on_write(self): + def on_write(self): # type: () -> None """ Socket is writable, called by Listener + :raises SocketFailed: on socket error + :return: True if I'm done sending shit for now """ - if self.is_failed: return False + if self.is_failed: + return False while True: if len(self.data_to_send) == 0: @@ -146,6 +155,6 @@ class BaseSocket(object): """Return descriptor number""" return self.sock.fileno() - def close(self): + def close(self): # type: () -> None """Close this socket""" self.sock.close() diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index 0feea2a2d23ec9272211944f27788fb6af83b81a..52f918ff65175ec169f4f0a17f1976134dc306de 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -2,10 +2,39 @@ from __future__ import absolute_import, division, print_function import threading +import logging import typing as tp - +import os from coolamqp.objects import Callable from coolamqp.uplink.listener.epoll_listener import EpollListener +from coolamqp.uplink.listener.select_listener import SelectListener +from coolamqp.uplink.listener.base_listener import BaseListener +from coolamqp.utils import prctl_set_name + +logger = logging.getLogger(__name__) + + +def get_listener_class(): # type: () -> tp.Type[BaseListener] + + if 'COOLAMQP_FORCE_SELECT_LISTENER' in os.environ: + return SelectListener + + try: + import select + select.epoll + except AttributeError: + return SelectListener # we're running on a platform that doesn't support epoll + + try: + import gevent.socket + except ImportError: + return EpollListener + import socket + + if socket.socket is gevent.socket.socket: + return SelectListener # gevent is active + + return EpollListener class ListenerThread(threading.Thread): @@ -16,12 +45,12 @@ class ListenerThread(threading.Thread): """ def __init__(self, name=None): # type: (tp.Optional[str]) - threading.Thread.__init__(self, name='coolamqp/ListenerThread') + super(ListenerThread, self).__init__(name=name or 'coolamqp/ListenerThread') self.daemon = True self.name = name or 'CoolAMQP' self.terminating = False self._call_next_io_event = Callable(oneshots=True) - self.listener = None + self.listener = None # type: BaseListener def call_next_io_event(self, callable): """ @@ -32,34 +61,34 @@ class ListenerThread(threading.Thread): all these are done. :param callable: callable/0 """ - self._call_next_io_event() + pass +# self._call_next_io_event.add(callable) - dummy that out, causes AssertionError to appear def terminate(self): self.terminating = True def init(self): """Called before start. It is not safe to fork after this""" - self.listener = EpollListener() + listener_class = get_listener_class() + logger.info('Using %s as a listener' % (listener_class, )) + self.listener = listener_class() def activate(self, sock): self.listener.activate(sock) def run(self): - try: - import prctl - except ImportError: - pass - else: - prctl.set_name(self.name + ' - AMQP listener thread') + prctl_set_name(self.name + '- listener thread') while not self.terminating: - self.listener.wait(timeout=1) + self.listener.wait() + self._call_next_io_event() self.listener.shutdown() def register(self, sock, # type: socket.socket on_read=lambda data: None, # type: tp.Callable[[bytes], None] - on_fail=lambda: None): # type: tp.Callable[[], None] + on_fail=lambda: None # type: tp.Callable[[], None] + ): """ Add a socket to be listened for by the loop. diff --git a/coolamqp/utils.py b/coolamqp/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..976207a6a1a4f87e8ff3dbe4c407f074058caef6 --- /dev/null +++ b/coolamqp/utils.py @@ -0,0 +1,19 @@ + +try: + IMPORT_ERRORS = (ModuleNotFoundError, ImportError) +except NameError: + IMPORT_ERRORS = (ImportError, ) + +try: + from time import monotonic +except IMPORT_ERRORS: + from monotonic import monotonic + +try: + from prctl import set_name as prctl_set_name +except IMPORT_ERRORS: + def prctl_set_name(name): + pass + + +__all__ = ['monotonic', 'prctl_set_name'] diff --git a/setup.py b/setup.py index 92f16b786bdaa9cbacea66518b31c42fcd10f67e..5345ecae35f69d99468e2ee1fc17caeaae97dc5e 100644 --- a/setup.py +++ b/setup.py @@ -7,13 +7,14 @@ from coolamqp import __version__ setup(keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], version=__version__, packages=find_packages(include=['coolamqp', 'coolamqp.*']), - install_requires=['six', 'monotonic'], + install_requires=['six'], # per coverage version for codeclimate-reporter tests_require=["nose2", "coverage", "nose2[coverage_plugin]"], test_suite='nose2.collector.collector', extras_require={ - ':python_version == "2.7"': ['futures', 'typing'], + ':python_version == "2.7"': ['futures', 'typing', 'monotonic'], 'prctl': ['prctl'], 'opentracing': ['opentracing'], + 'gevent': ['gevent'] } ) diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 11727c3efe798cb688602b5b41444ffa115d1b78..4560d98c240f00c6c2b564eb6c76241586ed9d25 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -9,7 +9,7 @@ import os import time import unittest -import monotonic +from coolamqp.utils import monotonic import six from coolamqp.clustering import Cluster, MessageReceived, NothingMuch @@ -52,11 +52,11 @@ class TestA(unittest.TestCase): # self.assertEquals(rmsg.body, data) def test_actually_waits(self): - a = monotonic.monotonic() + a = monotonic() self.c.drain(5) - self.assertTrue(monotonic.monotonic() - a >= 4) + self.assertGreaterEqual(monotonic() - a, 4) def test_set_qos_but_later(self): con, fut = self.c.consume(Queue(u'hello2', exclusive=True)) diff --git a/tests/utils.py b/tests/utils.py index abfdc412ad186cb916fcd4e66608641564e708d6..02058e9d4e3445a4e11dd869a882b2d95e47dac6 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -6,8 +6,9 @@ import os import socket import time import unittest +import six -import monotonic +from coolamqp.utils import monotonic from coolamqp.backends.base import AMQPBackend, ConnectionFailedError from coolamqp import Cluster, ClusterNode, ConnectionDown, \ @@ -47,9 +48,9 @@ class CoolAMQPTestCase(unittest.TestCase): def drainToAny(self, types, timeout, forbidden=[]): """Assert that messages with types, in any order, are found within timeout. Fail if any type from forbidden is found""" - start = monotonic.monotonic() + start = monotonic() types = set(types) - while monotonic.monotonic() - start < timeout: + while monotonic() - start < timeout: q = self.amqp.drain(1) if type(q) in forbidden: self.fail('%s found', type(q)) @@ -73,8 +74,8 @@ class CoolAMQPTestCase(unittest.TestCase): self.fail('Found %s but forbidden', type(p)) return p - start = monotonic.monotonic() - while monotonic.monotonic() - start < timeout: + start = monotonic() + while monotonic() - start < timeout: q = self.amqp.drain(1) if isinstance(q, type_): return q