From e49f09c4d29a070e45c4772a54f5514e44d56ec7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl>
Date: Thu, 21 May 2020 19:12:46 +0200
Subject: [PATCH] Integrate select-based listener (#48)

* add SelectListener

* add SelectListener

* test out select listener

* fix tests

* fix tests

* fix tests

* fix tests

* fix tests

* refactor

* refactor

* more secure registering in epoll listener

* more secure registering in epoll listener, v1.1.1

* reduced timeout for select listener

* refactor

* refactor

* refactor

* got rid of monotonic dependency for Python 3.x

* instantiate listener_class

* daemonize the thread in constructor

* daemonize the thread in constructor

* refactor

* refactor

* refactor

* refactor

* more refactor

* more refactor

* more refactor

* more refactor

* don't overwrite client data

* run these tests verbose

* run these tests verbose

* ImportError fix

* fix tests for monotonic
---
 .travis.yml                                 |   8 +-
 CHANGELOG.md                                |   3 +-
 README.md                                   |   4 +-
 compile_definitions/xml_fields.py           |   2 +-
 coolamqp/__init__.py                        |   2 +-
 coolamqp/attaches/channeler.py              |   8 +-
 coolamqp/attaches/publisher.py              |   4 +-
 coolamqp/clustering/cluster.py              |   6 +-
 coolamqp/clustering/single.py               |   7 +-
 coolamqp/uplink/connection/connection.py    |  15 +--
 coolamqp/uplink/handshake.py                |  11 +-
 coolamqp/uplink/heartbeat.py                |  10 +-
 coolamqp/uplink/listener/base_listener.py   | 100 ++++++++++++++++++
 coolamqp/uplink/listener/epoll_listener.py  | 111 +++++---------------
 coolamqp/uplink/listener/select_listener.py |  75 +++++++++++++
 coolamqp/uplink/listener/socket.py          |  25 +++--
 coolamqp/uplink/listener/thread.py          |  55 +++++++---
 coolamqp/utils.py                           |  19 ++++
 setup.py                                    |   5 +-
 tests/test_clustering/test_a.py             |   6 +-
 tests/utils.py                              |  11 +-
 21 files changed, 337 insertions(+), 150 deletions(-)
 create mode 100644 coolamqp/uplink/listener/base_listener.py
 create mode 100644 coolamqp/uplink/listener/select_listener.py
 create mode 100644 coolamqp/utils.py

diff --git a/.travis.yml b/.travis.yml
index 7e506eb..f42755e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -13,10 +13,12 @@ before_script:
   - curl -L https://codeclimate.com/downloads/test-reporter/test-reporter-latest-linux-amd64 > ./cc-test-reporter
   - chmod +x ./cc-test-reporter
   - ./cc-test-reporter before-build
+  - pip install nose2 coverage
 script:
-  - python -m compile_definitions
-  - python setup.py test
-  - python -m stress_tests
+  - coverage run -m compile_definitions
+  - coverage run --append -m nose2 -vv
+  - COOLAMQP_FORCE_SELECT_LISTENER=1 coverage run --append -m nose2 -vv
+  - coverage run --append -m stress_tests
 install:
   - python setup.py install
   - pip install -r stress_tests/requirements.txt
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a00c02e..bff2952 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,7 @@
 # v1.1.1
 
-* _TBA_
+* added `SelectListener`, which means CoolAMQP can 
+  run now on Windows and in gevent environments
 
 # v1.1
 
diff --git a/README.md b/README.md
index 48156d7..1129452 100644
--- a/README.md
+++ b/README.md
@@ -46,11 +46,13 @@ _Watch out for memoryviews!_ They're here to stay.
 Assertions are sprinkled throughout the code. You may wish to run with optimizations enabled
 if you need every CPU cycle you can get.
 
+Note that if you define the environment variable of `COOLAMQP_FORCE_SELECT_LISTENER`, 
+CoolAMQP will use select-based networking instead of epoll based.
+
 ## Current limitations
 
 * channel flow mechanism is not supported (#11)
 * _confirm=True_ is not available if you're not RabbitMQ (#8)
-* no Windows support (#9)
 
 
 ## Copyright holder change
diff --git a/compile_definitions/xml_fields.py b/compile_definitions/xml_fields.py
index c3c6808..48be6c6 100644
--- a/compile_definitions/xml_fields.py
+++ b/compile_definitions/xml_fields.py
@@ -32,7 +32,7 @@ class BaseField(object):
         self.field_name = field_name
 
     def find(self, elem):
-        raise NotImplementedError('abstract')
+        raise NotImplementedError('Abstract method!')
 
 
 class ComputedField(BaseField):
diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py
index 7d663af..d1d9708 100644
--- a/coolamqp/__init__.py
+++ b/coolamqp/__init__.py
@@ -1,2 +1,2 @@
 # coding=UTF-8
-__version__ = '1.1.1_a2'
+__version__ = '1.1.1'
diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py
index 3338e0a..b22bb47 100644
--- a/coolamqp/attaches/channeler.py
+++ b/coolamqp/attaches/channeler.py
@@ -199,10 +199,10 @@ class Channeler(Attache):
         assert self.channel_id is not None
         return self.connection.watch_for_method(self.channel_id, method, callback, on_fail=on_fail)
 
-    def method_and_watch(self, method_payload, method_classes_to_watch,
-                         callable):
-        # type: (coolamqp.framing.base.AMQPMethodPayload,
-        # tp.Iterable[type], tp.Callable[[coolamqp.framing.base.AMQPMethodPayload], None]) -> None
+    def method_and_watch(self, method_payload,      # type: coolamqp.framing.base.AMQPMethodPayload,
+                         method_classes_to_watch,   # type: tp.Iterable[tp.Type[AMQPMethodPayload]]
+                         callable                   # type: tp.Callable[[AMQPMethodPayload], None]
+                         ):     # type: () -> None
         """
         Syntactic sugar for
 
diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py
index 09dd41c..a93b5d9 100644
--- a/coolamqp/attaches/publisher.py
+++ b/coolamqp/attaches/publisher.py
@@ -224,7 +224,7 @@ class Publisher(Channeler, Synchronized):
             assert isinstance(xchg, (six.binary_type, six.text_type))
             self._pub(msg, xchg, rk, parent_span, span_enqueued, dont_close_span=True)
 
-    def _on_cnpub_delivery(self, payload):
+    def _on_cnpub_delivery(self, payload):  # type: (AMQPMethodPayload) -> None
         """
         This gets called on BasicAck and BasicNack, if mode is MODE_CNPUB
         """
@@ -293,7 +293,7 @@ class Publisher(Channeler, Synchronized):
         else:
             raise Exception(u'Invalid mode')
 
-    def on_operational(self, operational):
+    def on_operational(self, operational):      # type: (bool) -> None
         state = {True: u'up', False: u'down'}[operational]
         mode = \
             {Publisher.MODE_NOACK: u'noack', Publisher.MODE_CNPUB: u'cnpub'}[
diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py
index 8834fb0..551c210 100644
--- a/coolamqp/clustering/cluster.py
+++ b/coolamqp/clustering/cluster.py
@@ -10,7 +10,7 @@ import typing as tp
 import warnings
 from concurrent.futures import Future
 
-import monotonic
+from coolamqp.utils import monotonic
 import six
 
 from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer
@@ -305,8 +305,8 @@ class Cluster(object):
 
         if wait:
             # this is only going to take a short amount of time, so we're fine with polling
-            start_at = monotonic.monotonic()
-            while not self.connected and monotonic.monotonic() - start_at < timeout:
+            start_at = monotonic()
+            while not self.connected and monotonic() - start_at < timeout:
                 time.sleep(0.1)
             if not self.connected:
                 raise ConnectionDead(
diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py
index 5757f7d..8d3b883 100644
--- a/coolamqp/clustering/single.py
+++ b/coolamqp/clustering/single.py
@@ -2,6 +2,7 @@
 from __future__ import print_function, absolute_import, division
 
 import logging
+import typing as tp
 
 from coolamqp.framing.definitions import ConnectionUnblocked, ConnectionBlocked
 from coolamqp.objects import Callable
@@ -31,6 +32,7 @@ class SingleNodeReconnector(object):
         self.name = name or 'CoolAMQP'
 
         self.terminating = False
+        self.timeout = None
 
         self.on_fail = Callable()  #: public
         self.on_blocked = Callable()  #: public
@@ -39,9 +41,12 @@ class SingleNodeReconnector(object):
     def is_connected(self):  # type: () -> bool
         return self.connection is not None
 
-    def connect(self, timeout):  # type: (float) -> None
+    def connect(self, timeout=None):  # type: (tp.Optional[float]) -> None
         assert self.connection is None
 
+        timeout = timeout or self.timeout
+        self.timeout = timeout
+
         # Initiate connecting - this order is very important!
         self.connection = Connection(self.node_def, self.listener_thread,
                                      extra_properties=self.extra_properties,
diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py
index 5a7e0f0..f17ebee 100644
--- a/coolamqp/uplink/connection/connection.py
+++ b/coolamqp/uplink/connection/connection.py
@@ -8,7 +8,7 @@ import time
 import typing as tp
 import uuid
 
-import monotonic
+from coolamqp.utils import monotonic
 
 from coolamqp.exceptions import ConnectionDead
 from coolamqp.framing.base import AMQPMethodPayload
@@ -151,7 +151,7 @@ class Connection(object):
         while len(self.callables_on_connected) > 0:
             self.callables_on_connected.pop()()
 
-    def start(self, timeout):
+    def start(self, timeout=None):  # type: (tp.Optional[float]) -> None
         """
         Start processing events for this connect. Create the socket,
         transmit 'AMQP\x00\x00\x09\x01' and roll.
@@ -160,15 +160,16 @@ class Connection(object):
         """
 
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        start_at = monotonic.monotonic()
+        start_at = monotonic()
         while True:
             try:
                 sock.connect(
                     (self.node_definition.host, self.node_definition.port))
             except socket.error as e:
                 time.sleep(0.5)  # Connection refused? Very bad things?
-                if monotonic.monotonic() - start_at > timeout:
-                    raise ConnectionDead()
+                if timeout is not None:
+                    if monotonic() - start_at > timeout:
+                        raise ConnectionDead()
             else:
                 break
 
@@ -260,7 +261,7 @@ class Connection(object):
         """
         if self.log_frames is not None:
             for frame in frames:
-                self.log_frames.on_frame(time.monotonic(), frame, 'to_server')
+                self.log_frames.on_frame(monotonic(), frame, 'to_server')
 
         if frames is not None:
             self.sendf.send(frames, priority=priority)
@@ -280,7 +281,7 @@ class Connection(object):
         :param frame: AMQPFrame that was received
         """
         if self.log_frames is not None:
-            self.log_frames.on_frame(time.monotonic(), frame, 'to_client')
+            self.log_frames.on_frame(monotonic(), frame, 'to_client')
 
         watch_handled = False  # True if ANY watch handled this
 
diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py
index eb5a87c..162d555 100644
--- a/coolamqp/uplink/handshake.py
+++ b/coolamqp/uplink/handshake.py
@@ -21,9 +21,8 @@ CONNECTION_BLOCKED = b'connection.blocked'
 
 SUPPORTED_EXTENSIONS = [
     PUBLISHER_CONFIRMS,
-    CONSUMER_CANCEL_NOTIFY,
+    CONSUMER_CANCEL_NOTIFY,     # half assed support - we just .cancel the consumer, see #12
     CONNECTION_BLOCKED
-    # half assed support - we just .cancel the consumer, see #12
 ]
 
 CLIENT_DATA = [
@@ -105,12 +104,12 @@ class Handshaker(object):
         self.connection.watchdog(WATCHDOG_TIMEOUT, self.on_watchdog)
         self.connection.watch_for_method(0, ConnectionTune,
                                          self.on_connection_tune)
-        global CLIENT_DATA
-        CLIENT_DATA = copy.copy(CLIENT_DATA)
-        CLIENT_DATA.extend(self.EXTRA_PROPERTIES)
+
+        CLIENT_DATA_c = copy.copy(CLIENT_DATA)
+        CLIENT_DATA_c.extend(self.EXTRA_PROPERTIES)
         self.connection.send([
             AMQPMethodFrame(0,
-                            ConnectionStartOk(CLIENT_DATA, b'PLAIN',
+                            ConnectionStartOk(CLIENT_DATA_c, b'PLAIN',
                                               b'\x00' + self.login + b'\x00' + self.password,
                                               locale_supported[0]
                                               ))
diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py
index fcc3bae..89623d5 100644
--- a/coolamqp/uplink/heartbeat.py
+++ b/coolamqp/uplink/heartbeat.py
@@ -3,7 +3,7 @@ from __future__ import absolute_import, division, print_function
 
 import typing as tp
 
-import monotonic
+from coolamqp.utils import monotonic
 
 from coolamqp.framing.frames import AMQPHeartbeatFrame
 from coolamqp.uplink.connection.watches import AnyWatch
@@ -20,13 +20,13 @@ class Heartbeater(object):
         self.connection = connection
         self.heartbeat_interval = heartbeat_interval
 
-        self.last_heartbeat_on = monotonic.monotonic()  # last heartbeat from server
+        self.last_heartbeat_on = monotonic()  # last heartbeat from server
 
         self.connection.watchdog(self.heartbeat_interval, self.on_timer)
         self.connection.watch(AnyWatch(self.on_heartbeat))
 
     def on_heartbeat(self, frame):
-        self.last_heartbeat_on = monotonic.monotonic()
+        self.last_heartbeat_on = monotonic()
 
     def on_any_frame(self):
         """
@@ -41,14 +41,14 @@ class Heartbeater(object):
 
         Anyway, we should register an all-watch for this.
         """
-        self.last_heartbeat_on = monotonic.monotonic()
+        self.last_heartbeat_on = monotonic()
 
     def on_timer(self):
         """Timer says we should send a heartbeat"""
         self.connection.send([AMQPHeartbeatFrame()], priority=True)
 
         if (
-                monotonic.monotonic() - self.last_heartbeat_on) > 2 * self.heartbeat_interval:
+                monotonic() - self.last_heartbeat_on) > 2 * self.heartbeat_interval:
             # closing because of heartbeat
             self.connection.send(None)
 
diff --git a/coolamqp/uplink/listener/base_listener.py b/coolamqp/uplink/listener/base_listener.py
new file mode 100644
index 0000000..e48dc5f
--- /dev/null
+++ b/coolamqp/uplink/listener/base_listener.py
@@ -0,0 +1,100 @@
+from abc import ABCMeta, abstractmethod
+import heapq
+import typing as tp
+import six
+from coolamqp.utils import monotonic
+
+
+class BaseListener(object):
+    __metaclass__ = ABCMeta
+
+    def __init__(self):
+        self.fd_to_sock = {}    # type: tp.Dict[int, BaseSocket]
+        self.time_events = []  # type: tp.List[tp.Tuple[float, int, tp.Callable[[], None]]]
+
+    def do_timer_events(self):
+        # Timer events
+        mono = monotonic()
+        while len(self.time_events) > 0 and (self.time_events[0][0] < mono):
+            ts, fd, callback = heapq.heappop(self.time_events)
+            callback()
+
+    def oneshot(self, sock, delta, callback):
+        """
+        A socket registers a time callback
+        :param sock: BaseSocket instance
+        :param delta: "this seconds after now"
+        :param callback: callable/0
+        """
+        if sock.fileno() in self.fd_to_sock:
+            heapq.heappush(self.time_events, (monotonic() + delta,
+                                              sock.fileno(),
+                                              callback
+                                              ))
+
+    def noshot(self, sock):     # type: (BaseSocket) -> None
+        """
+        Clear all one-shots for a socket
+        :param sock: BaseSocket instance
+        """
+        fd = sock.fileno()
+        self.time_events = [q for q in self.time_events if q[1] != fd]
+
+    @abstractmethod
+    def wait(self, timeout=1):
+        """
+        This will be executed in a loop.
+
+        This must call .do_timer_events()
+        """
+
+    def close_socket(self, sock):   # type: (BaseSocket) -> None
+        del self.fd_to_sock[sock.fileno()]
+        sock.on_fail()
+        self.noshot(sock)
+        sock.close()
+
+    def shutdown(self):
+        """
+        Forcibly close all sockets that this manages (calling their on_fail's),
+        and close the object.
+
+        This object is unusable after this call.
+        """
+        self.time_events = []
+        for sock in list(six.itervalues(self.fd_to_sock)):
+            sock.on_fail()
+            sock.close()
+
+        self.fd_to_sock = {}
+
+    def oneshot(self, sock, delta, callback):
+        """
+        A socket registers a time callback
+        :param sock: BaseSocket instance
+        :param delta: "this seconds after now"
+        :param callback: callable/0
+        """
+        if sock.fileno() in self.fd_to_sock:
+            heapq.heappush(self.time_events, (monotonic() + delta,
+                                              sock.fileno(),
+                                              callback
+                                              ))
+
+    def activate(self, sock):  # type: (BaseSocket) -> None
+        self.fd_to_sock[sock.fileno()] = sock
+
+    @abstractmethod
+    def register(self, sock,                    # type: socket.socket
+                 on_read=lambda data: None,     # type: tp.Callable[[bytearray], None]
+                 on_fail=lambda: None         # type: tp.Callable[[], None]
+                 ):                     # type: () -> BaseSocket
+        """
+        This has to return a particular Socket instance, adapted to the needs of the listener.
+
+        :param sock: a socket instance (as returned by socket module)
+        :param on_read: callable(data) to be called with received data
+        :param on_fail: callable() to be called when socket fails
+
+        :return: a BaseSocket's subclass instance to use instead of this socket
+        """
diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py
index 6258fe5..7e38a02 100644
--- a/coolamqp/uplink/listener/epoll_listener.py
+++ b/coolamqp/uplink/listener/epoll_listener.py
@@ -1,16 +1,16 @@
 # coding=UTF-8
 from __future__ import absolute_import, division, print_function
 
-import collections
-import heapq
 import logging
 import select
 import socket
+import threading
 
-import monotonic
 import six
 
 from coolamqp.uplink.listener.socket import SocketFailed, BaseSocket
+from coolamqp.uplink.listener.base_listener import BaseListener
+
 
 logger = logging.getLogger(__name__)
 
@@ -19,18 +19,6 @@ RW = RO | select.EPOLLOUT
 
 
 class EpollSocket(BaseSocket):
-    """
-    EpollListener substitutes your BaseSockets with this
-    :type sock: socket.socket
-    :type on_read: tp.Callable[[bytes], None]
-    :type on_fail: tp.Callable[[], None]
-    :type listener: coolamqp.uplink.listener.ListenerThread
-    """
-
-    def __init__(self, sock, on_read, on_fail, listener):
-        BaseSocket.__init__(self, sock, on_read=on_read, on_fail=on_fail)
-        self.listener = listener
-        self.priority_queue = collections.deque()
 
     def send(self, data, priority=False):
         """
@@ -43,47 +31,28 @@ class EpollSocket(BaseSocket):
             # silence. If there are errors, it's gonna get nuked soon.
             pass
 
-    def oneshot(self, seconds_after, callable):
-        """
-        Set to fire a callable N seconds after
-        :param seconds_after: seconds after this
-        :param callable: callable/0
-        """
-        self.listener.oneshot(self, seconds_after, callable)
-
-    def noshot(self):
-        """
-        Clear all time-delayed callables.
-
-        This will make no time-delayed callables delivered if ran in listener thread
-        """
-        self.listener.noshot(self)
-
 
-class EpollListener(object):
+class EpollListener(BaseListener):
     """
     A listener using epoll.
     """
 
     def __init__(self):
         self.epoll = select.epoll()
-        self.fd_to_sock = {}
-        self.time_events = []
+        self.socket_activation_lock = threading.Lock()
         self.sockets_to_activate = []
+        super(EpollListener, self).__init__()
 
     def wait(self, timeout=1):
-        for socket_to_activate in self.sockets_to_activate:
-            logger.debug('Activating fd %s', (socket_to_activate.fileno(),))
-            self.epoll.register(socket_to_activate.fileno(), RW)
-        self.sockets_to_activate = []
+        with self.socket_activation_lock:
+            for socket_to_activate in self.sockets_to_activate:
+                logger.debug('Activating fd %s', (socket_to_activate.fileno(),))
+                self.epoll.register(socket_to_activate.fileno(), RW)
+            self.sockets_to_activate = []
 
         events = self.epoll.poll(timeout=timeout)
 
-        # Timer events
-        mono = monotonic.monotonic()
-        while len(self.time_events) > 0 and (self.time_events[0][0] < mono):
-            ts, fd, callback = heapq.heappop(self.time_events)
-            callback()
+        self.do_timer_events()
 
         for fd, event in events:
             sock = self.fd_to_sock[fd]
@@ -98,7 +67,6 @@ class EpollListener(object):
                     sock.on_read()
 
                 if event & select.EPOLLOUT:
-
                     sock.on_write()
                     # I'm done with sending for now
                     if len(sock.data_to_send) == 0 and len(
@@ -107,24 +75,16 @@ class EpollListener(object):
 
             except SocketFailed as e:
                 logger.debug('Socket %s has raised %s', fd, e)
-                self.epoll.unregister(fd)
-                del self.fd_to_sock[fd]
-                sock.on_fail()
-                self.noshot(sock)
-                sock.close()
+                self.close_socket(sock)
 
         # Do any of the sockets want to send data Re-register them
-        for socket in self.fd_to_sock.values():
-            if socket.wants_to_send_data():
-                self.epoll.modify(socket.fileno(), RW)
+        for sock in six.itervalues(self.fd_to_sock):
+            if sock.wants_to_send_data():
+                self.epoll.modify(sock.fileno(), RW)
 
-    def noshot(self, sock):
-        """
-        Clear all one-shots for a socket
-        :param sock: BaseSocket instance
-        """
-        fd = sock.fileno()
-        self.time_events = [q for q in self.time_events if q[1] != fd]
+    def close_socket(self, sock):  # type: (BaseSocket) -> None
+        self.epoll.unregister(sock.fileno())
+        super(EpollListener, self).close_socket(sock)
 
     def shutdown(self):
         """
@@ -133,42 +93,25 @@ class EpollListener(object):
 
         This object is unusable after this call.
         """
-        self.time_events = []
-        for sock in list(six.itervalues(self.fd_to_sock)):
-            sock.on_fail()
-            sock.close()
-
-        self.fd_to_sock = {}
+        super(EpollListener, self).shutdown()
         self.epoll.close()
 
-    def oneshot(self, sock, delta, callback):
-        """
-        A socket registers a time callback
-        :param sock: BaseSocket instance
-        :param delta: "this seconds after now"
-        :param callback: callable/0
-        """
-        if sock.fileno() in self.fd_to_sock:
-            heapq.heappush(self.time_events, (monotonic.monotonic() + delta,
-                                              sock.fileno(),
-                                              callback
-                                              ))
-
-    def activate(self, sock):  # type: (coolamqp.uplink.listener.epoll_listener.EpollSocket) -> None
-        self.sockets_to_activate.append(sock)
+    def activate(self, sock):  # type: (BaseSocket) -> None
+        super(EpollListener, self).activate(sock)
+        with self.socket_activation_lock:
+            self.sockets_to_activate.append(sock)
 
     def register(self, sock, on_read=lambda data: None,
                  on_fail=lambda: None):
         """
         Add a socket to be listened for by the loop.
 
+        Please note that .activate() will be later called on this socket.
+
         :param sock: a socket instance (as returned by socket module)
         :param on_read: callable(data) to be called with received data
         :param on_fail: callable() to be called when socket fails
 
         :return: a BaseSocket instance to use instead of this socket
         """
-        sock = EpollSocket(sock, on_read, on_fail, self)
-        self.fd_to_sock[sock.fileno()] = sock
-
-        return sock
+        return EpollSocket(sock, on_read, on_fail=on_fail, listener=self)
diff --git a/coolamqp/uplink/listener/select_listener.py b/coolamqp/uplink/listener/select_listener.py
new file mode 100644
index 0000000..75719ba
--- /dev/null
+++ b/coolamqp/uplink/listener/select_listener.py
@@ -0,0 +1,75 @@
+# coding=UTF-8
+from __future__ import absolute_import, division, print_function
+
+import logging
+import select
+import socket
+
+import six
+
+from coolamqp.uplink.listener.socket import SocketFailed, BaseSocket
+from coolamqp.uplink.listener.base_listener import BaseListener
+
+logger = logging.getLogger(__name__)
+
+RO = select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR
+RW = RO | select.EPOLLOUT
+
+
+class SelectListener(BaseListener):
+    """
+    A listener using select
+    """
+
+    def wait(self, timeout=0.5):
+        rds_and_exs = []        # waiting both for read and for exception
+        wrs = []                # waiting for write
+        for sock in six.itervalues(self.fd_to_sock):
+            rds_and_exs.append(sock)
+            if sock.wants_to_send_data():
+                wrs.append(sock)
+
+        self.do_timer_events()
+
+        try:
+            rds, wrs, exs = select.select(rds_and_exs, wrs, rds_and_exs, timeout)
+        except (select.error, socket.error, IOError):
+            for sock in rds_and_exs:
+                try:
+                    select.select([sock], [], [], timeout=0)
+                except (select.error, socket.error, IOError):
+                    self.close_socket(sock)
+                    return
+            else:
+                return
+
+        for sock_rd in rds:
+            try:
+                sock_rd.on_read()
+            except SocketFailed:
+                return self.close_socket(sock_rd)
+
+        for sock_wr in wrs:
+            try:
+                sock_wr.on_write()
+            except SocketFailed:
+                return self.close_socket(sock_wr)
+
+        for sock_ex in exs:
+            try:
+                sock_rd.on_read()
+            except SocketFailed:
+                return self.close_socket(sock_ex)
+
+    def register(self, sock, on_read=lambda data: None,
+                 on_fail=lambda: None):
+        """
+        Add a socket to be listened for by the loop.
+
+        :param sock: a socket instance (as returned by socket module)
+        :param on_read: callable(data) to be called with received data
+        :param on_fail: callable() to be called when socket fails
+
+        :return: a BaseSocket instance to use instead of this socket
+        """
+        return BaseSocket(sock, on_read, on_fail=on_fail, listener=self)
diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py
index f64bc3f..d149e17 100644
--- a/coolamqp/uplink/listener/socket.py
+++ b/coolamqp/uplink/listener/socket.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import, division, print_function
 
 import collections
 import logging
+from abc import ABCMeta, abstractmethod
 import socket
 
 logger = logging.getLogger(__name__)
@@ -20,10 +21,12 @@ class BaseSocket(object):
 
     To be instantiated only by Listeners.
     """
+    __metaclass__ = ABCMeta
 
     def __init__(self, sock, on_read=lambda data: None,
                  on_time=lambda: None,
-                 on_fail=lambda: None):
+                 on_fail=lambda: None,
+                 listener=None):
         """
 
         :param sock: socketobject
@@ -35,6 +38,7 @@ class BaseSocket(object):
             Listener thread context.
             Socket descriptor will be handled by listener.
             This should not
+        :param listener: listener that registered this socket
         """
         assert sock is not None
         self.sock = sock
@@ -44,6 +48,7 @@ class BaseSocket(object):
         self._on_fail = on_fail
         self.on_time = on_time
         self.is_failed = False
+        self.listener = listener
 
     def on_fail(self):
         self.is_failed = True
@@ -76,7 +81,7 @@ class BaseSocket(object):
         :param seconds_after: seconds after this
         :param callable: callable/0
         """
-        raise Exception('Abstract; listener should override that')
+        self.listener.oneshot(self, seconds_after, callable)
 
     def noshot(self):
         """
@@ -84,11 +89,12 @@ class BaseSocket(object):
 
         This will make no time-delayed callables delivered if ran in listener thread
         """
-        raise Exception('Abstract; listener should override that')
+        self.listener.noshot(self)
 
-    def on_read(self):
+    def on_read(self):      # type: () -> None
         """Socket is readable, called by Listener"""
-        if self.is_failed: return
+        if self.is_failed:
+            return
         try:
             data = self.sock.recv(2048)
         except (IOError, socket.error) as e:
@@ -105,13 +111,16 @@ class BaseSocket(object):
     def wants_to_send_data(self):  # type: () -> bool
         return not (len(self.data_to_send) == 0 and len(self.priority_queue) == 0)
 
-    def on_write(self):
+    def on_write(self):      # type: () -> None
         """
         Socket is writable, called by Listener
+
         :raises SocketFailed: on socket error
+
         :return: True if I'm done sending shit for now
         """
-        if self.is_failed: return False
+        if self.is_failed:
+            return False
 
         while True:
             if len(self.data_to_send) == 0:
@@ -146,6 +155,6 @@ class BaseSocket(object):
         """Return descriptor number"""
         return self.sock.fileno()
 
-    def close(self):
+    def close(self):    # type: () -> None
         """Close this socket"""
         self.sock.close()
diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py
index 0feea2a..52f918f 100644
--- a/coolamqp/uplink/listener/thread.py
+++ b/coolamqp/uplink/listener/thread.py
@@ -2,10 +2,39 @@
 from __future__ import absolute_import, division, print_function
 
 import threading
+import logging
 import typing as tp
-
+import os
 from coolamqp.objects import Callable
 from coolamqp.uplink.listener.epoll_listener import EpollListener
+from coolamqp.uplink.listener.select_listener import SelectListener
+from coolamqp.uplink.listener.base_listener import BaseListener
+from coolamqp.utils import prctl_set_name
+
+logger = logging.getLogger(__name__)
+
+
+def get_listener_class():   # type: () -> tp.Type[BaseListener]
+
+    if 'COOLAMQP_FORCE_SELECT_LISTENER' in os.environ:
+        return SelectListener
+
+    try:
+        import select
+        select.epoll
+    except AttributeError:
+        return SelectListener   # we're running on a platform that doesn't support epoll
+
+    try:
+        import gevent.socket
+    except ImportError:
+        return EpollListener
+    import socket
+
+    if socket.socket is gevent.socket.socket:
+        return SelectListener     # gevent is active
+
+    return EpollListener
 
 
 class ListenerThread(threading.Thread):
@@ -16,12 +45,12 @@ class ListenerThread(threading.Thread):
     """
 
     def __init__(self, name=None):  # type: (tp.Optional[str])
-        threading.Thread.__init__(self, name='coolamqp/ListenerThread')
+        super(ListenerThread, self).__init__(name=name or 'coolamqp/ListenerThread')
         self.daemon = True
         self.name = name or 'CoolAMQP'
         self.terminating = False
         self._call_next_io_event = Callable(oneshots=True)
-        self.listener = None
+        self.listener = None        # type: BaseListener
 
     def call_next_io_event(self, callable):
         """
@@ -32,34 +61,34 @@ class ListenerThread(threading.Thread):
         all these are done.
         :param callable: callable/0
         """
-        self._call_next_io_event()
+        pass
+#        self._call_next_io_event.add(callable) - dummy that out, causes AssertionError to appear
 
     def terminate(self):
         self.terminating = True
 
     def init(self):
         """Called before start. It is not safe to fork after this"""
-        self.listener = EpollListener()
+        listener_class = get_listener_class()
+        logger.info('Using %s as a listener' % (listener_class, ))
+        self.listener = listener_class()
 
     def activate(self, sock):
         self.listener.activate(sock)
 
     def run(self):
-        try:
-            import prctl
-        except ImportError:
-            pass
-        else:
-            prctl.set_name(self.name + ' - AMQP listener thread')
+        prctl_set_name(self.name + '- listener thread')
 
         while not self.terminating:
-            self.listener.wait(timeout=1)
+            self.listener.wait()
+            self._call_next_io_event()
 
         self.listener.shutdown()
 
     def register(self, sock,  # type: socket.socket
                  on_read=lambda data: None,  # type: tp.Callable[[bytes], None]
-                 on_fail=lambda: None):  # type: tp.Callable[[], None]
+                 on_fail=lambda: None      # type: tp.Callable[[], None]
+                 ):
         """
         Add a socket to be listened for by the loop.
 
diff --git a/coolamqp/utils.py b/coolamqp/utils.py
new file mode 100644
index 0000000..976207a
--- /dev/null
+++ b/coolamqp/utils.py
@@ -0,0 +1,19 @@
+
+try:
+    IMPORT_ERRORS = (ModuleNotFoundError, ImportError)
+except NameError:
+    IMPORT_ERRORS = (ImportError, )
+
+try:
+    from time import monotonic
+except IMPORT_ERRORS:
+    from monotonic import monotonic
+
+try:
+    from prctl import set_name as prctl_set_name
+except IMPORT_ERRORS:
+    def prctl_set_name(name):
+        pass
+
+
+__all__ = ['monotonic', 'prctl_set_name']
diff --git a/setup.py b/setup.py
index 92f16b7..5345eca 100644
--- a/setup.py
+++ b/setup.py
@@ -7,13 +7,14 @@ from coolamqp import __version__
 setup(keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'],
       version=__version__,
       packages=find_packages(include=['coolamqp', 'coolamqp.*']),
-      install_requires=['six', 'monotonic'],
+      install_requires=['six'],
       # per coverage version for codeclimate-reporter
       tests_require=["nose2", "coverage", "nose2[coverage_plugin]"],
       test_suite='nose2.collector.collector',
       extras_require={
-          ':python_version == "2.7"': ['futures', 'typing'],
+          ':python_version == "2.7"': ['futures', 'typing', 'monotonic'],
           'prctl': ['prctl'],
           'opentracing': ['opentracing'],
+          'gevent': ['gevent']
       }
       )
diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py
index 11727c3..4560d98 100644
--- a/tests/test_clustering/test_a.py
+++ b/tests/test_clustering/test_a.py
@@ -9,7 +9,7 @@ import os
 import time
 import unittest
 
-import monotonic
+from coolamqp.utils import monotonic
 import six
 
 from coolamqp.clustering import Cluster, MessageReceived, NothingMuch
@@ -52,11 +52,11 @@ class TestA(unittest.TestCase):
     #        self.assertEquals(rmsg.body, data)
 
     def test_actually_waits(self):
-        a = monotonic.monotonic()
+        a = monotonic()
 
         self.c.drain(5)
 
-        self.assertTrue(monotonic.monotonic() - a >= 4)
+        self.assertGreaterEqual(monotonic() - a, 4)
 
     def test_set_qos_but_later(self):
         con, fut = self.c.consume(Queue(u'hello2', exclusive=True))
diff --git a/tests/utils.py b/tests/utils.py
index abfdc41..02058e9 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -6,8 +6,9 @@ import os
 import socket
 import time
 import unittest
+import six
 
-import monotonic
+from coolamqp.utils import monotonic
 from coolamqp.backends.base import AMQPBackend, ConnectionFailedError
 
 from coolamqp import Cluster, ClusterNode, ConnectionDown, \
@@ -47,9 +48,9 @@ class CoolAMQPTestCase(unittest.TestCase):
     def drainToAny(self, types, timeout, forbidden=[]):
         """Assert that messages with types, in any order, are found within timeout.
         Fail if any type from forbidden is found"""
-        start = monotonic.monotonic()
+        start = monotonic()
         types = set(types)
-        while monotonic.monotonic() - start < timeout:
+        while monotonic() - start < timeout:
             q = self.amqp.drain(1)
             if type(q) in forbidden:
                 self.fail('%s found', type(q))
@@ -73,8 +74,8 @@ class CoolAMQPTestCase(unittest.TestCase):
                     self.fail('Found %s but forbidden', type(p))
             return p
 
-        start = monotonic.monotonic()
-        while monotonic.monotonic() - start < timeout:
+        start = monotonic()
+        while monotonic() - start < timeout:
             q = self.amqp.drain(1)
             if isinstance(q, type_):
                 return q
-- 
GitLab