From 189c947911230dce8da474aa2e80fec9ceb8de4a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl>
Date: Sat, 4 Jan 2020 20:32:35 +0100
Subject: [PATCH] Docker based tests (#45)

---
 README.md                               | 20 ++++++++-
 coolamqp/__init__.py                    |  2 +-
 docker-compose.yml                      | 17 ++++++++
 stress_tests/__main__.py                | 58 +++++++++++++++++--------
 stress_tests/client/__init__.py         |  5 +--
 stress_tests/server/__init__.py         | 12 ++---
 stress_tests/settings.py                |  3 +-
 tests/Dockerfile                        | 18 ++++++++
 tests/run.py                            |  4 +-
 tests/test_clustering/test_a.py         |  2 +-
 tests/test_clustering/test_double.py    |  3 +-
 tests/test_clustering/test_exchanges.py |  3 +-
 tests/test_clustering/test_things.py    |  5 ++-
 tests/utils.py                          |  3 +-
 14 files changed, 115 insertions(+), 40 deletions(-)
 create mode 100644 docker-compose.yml
 create mode 100644 tests/Dockerfile

diff --git a/README.md b/README.md
index ef54832..7fc18bf 100644
--- a/README.md
+++ b/README.md
@@ -93,4 +93,22 @@ A downloaded from OASIS machine-readable AMQP 0.9.1 specification.
 ### [docs](docs/)
 
 Sources for the documentation, available
-[here](https://coolamqp.readthedocs.io/en/latest/).
\ No newline at end of file
+[here](https://coolamqp.readthedocs.io/en/latest/).
+
+## Running unit tests
+
+Unit tests are powered by nose. They require an available AMQP broker.
+If you host the broker other than localhost, set the env *AMQP_HOST* to correct value.
+The default username used is guest, and password is guest.
+
+You can also run unit tests from Docker, if you wish so. To launch the unit test suite:
+
+```bash
+docker-compose up tests
+```
+
+To launch the stress test suite
+
+```bash
+docker-compose up stress_tests
+```
diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py
index ebd3768..44ead02 100644
--- a/coolamqp/__init__.py
+++ b/coolamqp/__init__.py
@@ -1,2 +1,2 @@
 # coding=UTF-8
-__version__ = '0.101a2'
+__version__ = '0.101a3'
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..e525a5d
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,17 @@
+version: '3.2'
+services:
+  amqp:
+    image: rabbitmq
+  tests:
+    build:
+      context: .
+      dockerfile: tests/Dockerfile
+    depends_on:
+      - amqp
+  stress_tests:
+    build:
+      context: .
+      dockerfile: tests/Dockerfile
+    command: python -m stress_tests
+    depends_on:
+      - amqp
diff --git a/stress_tests/__main__.py b/stress_tests/__main__.py
index 378e9f4..8258033 100644
--- a/stress_tests/__main__.py
+++ b/stress_tests/__main__.py
@@ -1,27 +1,35 @@
 import logging
 import multiprocessing
+import os
 import sys
 import time
 
+from coolamqp import __version__
+
 logger = logging.getLogger(__name__)
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.WARNING)
 
 if __name__ == '__main__':
     if sys.version.startswith('2.7'):
         logger.critical('This will not run on Python 2.x1')
         sys.exit(0)
 
+    logger.warning('Starting stress tests on CoolAMQP v%s', __version__)
+
     from queue import Empty
 
-    notify_client, result_client, notify_server, result_server = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue()
+    notify_client, result_client, notify_server, result_server = multiprocessing.Queue(), \
+                                                                 multiprocessing.Queue(), \
+                                                                 multiprocessing.Queue(), \
+                                                                 multiprocessing.Queue()
 
     from .client import run as run_client
     from .server import run as run_server
 
     server = multiprocessing.Process(target=run_server, args=(
-    notify_client, result_client, notify_server, result_server))
+        notify_client, result_client, notify_server, result_server))
     client = multiprocessing.Process(target=run_client, args=(
-    notify_client, result_client, notify_server, result_server))
+        notify_client, result_client, notify_server, result_server))
 
     server.start()
     client.start()
@@ -30,20 +38,36 @@ if __name__ == '__main__':
         time.sleep(40)
     except KeyboardInterrupt:
         pass
+    logger.warning('Finishing up')
 
-    notify_client.put(None)
-    notify_server.put(None)
+    notify_client.put('term')
+    notify_server.put('term')
 
-    try:
-        obj = result_server.get(timeout=1.0)
-        if obj == 'fail':
-            sys.exit(1)
-    except Empty:
-        pass
+    client.join(timeout=5.0)
 
     try:
-        obj = result_client.get(timeout=1.0)
-        if obj == 'fail':
-            sys.exit(1)
-    except Empty:
-        pass
+        try:
+            obj = result_server.get(timeout=1.0)
+            if obj == 'fail':
+                sys.exit(1)
+        except Empty:
+            pass
+
+        try:
+            obj = result_client.get(timeout=1.0)
+            if obj == 'fail':
+                sys.exit(1)
+        except Empty:
+            pass
+    finally:
+        try:
+            if server.is_alive():
+                os.kill(server.pid, 9)
+        except OSError:
+            pass
+
+        try:
+            if client.is_alive():
+                os.kill(client.pid, 9)
+        except OSError:
+            pass
diff --git a/stress_tests/client/__init__.py b/stress_tests/client/__init__.py
index dd108fe..c7d5d5d 100644
--- a/stress_tests/client/__init__.py
+++ b/stress_tests/client/__init__.py
@@ -109,15 +109,12 @@ class ConnectAndDisconnectThread(TerminableThread):
 
 
 def run(client_notify, result_client, server_notify, server_result):
-    logging.basicConfig(level=logging.DEBUG)
+    logging.basicConfig(level=logging.WARNING)
 
     lftf = LogFramesToFile('client.txt')
     amqp = connect(on_fail=result_client, log_frames=lftf)
     cad = ConnectAndDisconnectThread(amqp)
 
-    server_notify.put(None)
-    client_notify.get()
-
     cad.start()
     started_at = time.monotonic()
     terminating = False
diff --git a/stress_tests/server/__init__.py b/stress_tests/server/__init__.py
index ff7c03a..1a2cd26 100644
--- a/stress_tests/server/__init__.py
+++ b/stress_tests/server/__init__.py
@@ -1,10 +1,10 @@
 import logging
 
+from satella.coding.concurrent import TerminableThread
+
 from coolamqp.clustering.events import ReceivedMessage
 from coolamqp.objects import Queue, Message
 
-logger = logging.getLogger(__name__)
-from satella.coding.concurrent import TerminableThread
 from ..settings import queue_names, connect, LogFramesToFile
 
 
@@ -20,22 +20,18 @@ class Server(TerminableThread):
     def loop(self):
         evt = self.amqp.drain(timeout=1.0)
         if isinstance(evt, ReceivedMessage):
-            routing_key = evt.routing_key.tobytes().decode('utf8')
-            routing_key = routing_key.replace('-repl', '')
+            routing_key = evt.routing_key.tobytes().decode('utf8').replace('-repl', '')
             self.amqp.publish(Message(evt.body), routing_key=routing_key)
 
 
 def run(notify_client, result_client, notify_server, server_result):
-    logging.basicConfig(level=logging.DEBUG)
+    logging.basicConfig(level=logging.WARNING)
 
     lftf = LogFramesToFile('server.txt')
 
     amqp = connect(on_fail=server_result, log_frames=lftf)
     server = Server(amqp)
 
-    notify_client.put(None)
-    notify_server.get()
-
     server.start()
 
     try:
diff --git a/stress_tests/settings.py b/stress_tests/settings.py
index aa36066..e0cc6b8 100644
--- a/stress_tests/settings.py
+++ b/stress_tests/settings.py
@@ -1,11 +1,12 @@
 import logging
+import os
 
 from coolamqp.clustering import Cluster
 from coolamqp.objects import NodeDefinition
 
 logger = logging.getLogger(__name__)
 
-NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20)
+NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
 logging.basicConfig(level=logging.DEBUG)
 
 
diff --git a/tests/Dockerfile b/tests/Dockerfile
new file mode 100644
index 0000000..8ceb9a9
--- /dev/null
+++ b/tests/Dockerfile
@@ -0,0 +1,18 @@
+FROM python:3.7
+
+ADD requirements.txt /tmp/requirements.txt
+ADD stress_tests/requirements.txt /tmp/requirements2.txt
+RUN pip install -r /tmp/requirements.txt && \
+    pip install -r /tmp/requirements2.txt && \
+    pip install nose mock coverage
+
+ADD . /coolamqp
+WORKDIR /coolamqp
+
+ENV AMQP_HOST=amqp
+
+# for those pesky builds on Windows
+RUN chmod -R ugo-x /coolamqp
+
+CMD python setup.py nosetests --tests tests
+
diff --git a/tests/run.py b/tests/run.py
index 8a9f1a0..2e02c0f 100644
--- a/tests/run.py
+++ b/tests/run.py
@@ -6,9 +6,9 @@ from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue,
 from coolamqp.exceptions import AMQPError
 from coolamqp.clustering import Cluster
 
-import time
+import os
 
-NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20)
+NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
 logging.basicConfig(level=logging.DEBUG)
 
 amqp = Cluster([NODE])
diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py
index 39aee90..d841838 100644
--- a/tests/test_clustering/test_a.py
+++ b/tests/test_clustering/test_a.py
@@ -16,7 +16,7 @@ from coolamqp.clustering import Cluster, MessageReceived, NothingMuch
 from coolamqp.objects import Message, NodeDefinition, Queue, \
     ReceivedMessage, Exchange
 
-NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20)
+NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
 logging.basicConfig(level=logging.DEBUG)
 
 
diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py
index e05267a..4e97f94 100644
--- a/tests/test_clustering/test_double.py
+++ b/tests/test_clustering/test_double.py
@@ -5,12 +5,13 @@ from __future__ import print_function, absolute_import, division
 import logging
 import time
 import unittest
+import os
 
 from coolamqp.clustering import Cluster
 from coolamqp.exceptions import AMQPError, RESOURCE_LOCKED
 from coolamqp.objects import NodeDefinition, Queue
 
-NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20)
+NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
 
 logging.basicConfig(level=logging.DEBUG)
 
diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py
index 65b0287..0838ad8 100644
--- a/tests/test_clustering/test_exchanges.py
+++ b/tests/test_clustering/test_exchanges.py
@@ -2,6 +2,7 @@
 from __future__ import print_function, absolute_import, division
 import six
 import unittest
+import os
 import time, logging, threading
 from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, \
     ReceivedMessage, Exchange
@@ -10,7 +11,7 @@ from coolamqp.exceptions import AMQPError
 import time
 
 # todo handle bad auth
-NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20)
+NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
 logging.basicConfig(level=logging.DEBUG)
 
 
diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py
index 061bf43..7b10221 100644
--- a/tests/test_clustering/test_things.py
+++ b/tests/test_clustering/test_things.py
@@ -2,6 +2,7 @@
 
 from __future__ import print_function, absolute_import, division
 import six
+import os
 import unittest
 import time, logging, threading, monotonic, warnings
 from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, \
@@ -10,7 +11,7 @@ from coolamqp.clustering import Cluster, MessageReceived, NothingMuch
 
 import time
 
-NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20)
+NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
 logging.basicConfig(level=logging.DEBUG)
 
 
@@ -18,7 +19,7 @@ class TestConnecting(unittest.TestCase):
     def test_on_fail(self):
         q = {'failed': False}
         c = Cluster(
-            NodeDefinition('127.0.0.1', 'xguest', 'xguest', heartbeat=20),
+            NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'xguest', 'xguest', heartbeat=20),
             on_fail=lambda: q.update(failed=True))
         c.start()
         time.sleep(5)
diff --git a/tests/utils.py b/tests/utils.py
index a311e14..4b60f58 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -5,6 +5,7 @@ import time
 import socket
 import collections
 import monotonic
+import os
 
 from coolamqp import Cluster, ClusterNode, ConnectionUp, ConnectionDown, \
     ConnectionUp, ConsumerCancelled
@@ -12,7 +13,7 @@ from coolamqp.backends.base import AMQPBackend, ConnectionFailedError
 
 
 def getamqp():
-    amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')], extra_properties=[
+    amqp = Cluster([ClusterNode(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest')], extra_properties=[
         (b'mode', (b'Testing', 'S')),
     ])
     amqp.start()
-- 
GitLab