diff --git a/.codeclimate.yml b/.codeclimate.yml index 493477c212be35db17ed3a2732477606ce80b57a..4cff32f19bc61786b142976064a837fb03c58728 100644 --- a/.codeclimate.yml +++ b/.codeclimate.yml @@ -18,6 +18,7 @@ exclude_paths: - tests/** - coolamqp/framing/definitions.py - compile_definitions.py +- stress_tests/** ratings: paths: - coolamqp/** diff --git a/.coveragerc b/.coveragerc index 17131187d3687dec32408c46e87f17a7d09df260..6fd5d12dd702258b282ccfe01d28000f02787400 100644 --- a/.coveragerc +++ b/.coveragerc @@ -4,5 +4,6 @@ include= coolamqp/* omit= tests/* + stress_tests/* compile_definitions.py coolamqp/framing/definitions.py diff --git a/.gitignore b/.gitignore index 59d7707e0826078e2efb60980c3ef641397df0dd..867741b9e6b2755d3c64fe45bd6514726a6f5618 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,8 @@ coolamqp/framing/definitions.py env/ build/ docs/_build/ +client.txt +server.txt develop-eggs/ htmlcov/ dist/ diff --git a/.travis.yml b/.travis.yml index bbb1f2d4d2c78c6ed42e85816e3ceeb466b0e53c..bb011f28fc98b12fdee5e2f578e1bc30f691fef2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,9 +8,11 @@ python: - "pypy3.5" script: - python compile_definitions.py - - python setup.py nosetests + - python setup.py nosetests --tests tests + - python -m stress_tests install: - pip install -r requirements.txt + - pip install -r stress_tests/requirements.txt - pip install --force-reinstall "coverage>=4.0,<4.4" codeclimate-test-reporter after_success: - codeclimate-test-reporter diff --git a/CHANGELOG.md b/CHANGELOG.md index ee58d84ec4511e9a24e4fd14d629cbdcd18ed54c..6e1410df86d3fc130d40f62a63aba3fe6d6b2587 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# v0.98: + +* *bugfix release*: fixed multiple race conditions, added stress tests + # v0.97: * Changed copyright in connection properties to better reflect the current situation diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index e13fd5912d66fd05ca70b5425ef6a3ea31837477..2f7b5588b0cffd96b9f8592266fe94f84c9435dd 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -4,7 +4,6 @@ from __future__ import absolute_import, division, print_function import io import logging import uuid - from concurrent.futures import Future from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE @@ -133,6 +132,7 @@ class Consumer(Channeler): self.receiver = None # MessageReceiver instance self.attache_group = None # attache group this belongs to. + self.channel_close_sent = False # for avoiding situations where ChannelClose is sent twice # if this is not None, then it has an attribute # on_cancel_customer(Consumer instance) self.qos = _qosify(qos) @@ -186,11 +186,13 @@ class Consumer(Channeler): # you'll blow up big next time you try to use this consumer if you # can't cancel, but just close if self.consumer_tag is not None: - self.method_and_watch(BasicCancel(self.consumer_tag, False), - [BasicCancelOk], - self.on_close) + if not self.channel_close_sent and self.state == ST_ONLINE: + self.method_and_watch(BasicCancel(self.consumer_tag, False), + [BasicCancelOk], + self.on_close) else: - self.method(ChannelClose(0, b'cancelling', 0, 0)) + if not self.channel_close_sent and self.state == ST_ONLINE: + self.method(ChannelClose(0, b'cancelling', 0, 0)) if self.attache_group is not None: self.attache_group.on_cancel_customer(self) @@ -201,6 +203,7 @@ class Consumer(Channeler): super(Consumer, self).on_operational(operational) if operational: + self.channel_close_sent = False self.receiver = MessageReceiver(self) # notify the future @@ -249,8 +252,10 @@ class Consumer(Channeler): # on_close is a one_shot watch. We need to re-register it now. self.register_on_close_watch() - self.methods([BasicCancelOk(payload.consumer_tag), - ChannelClose(0, b'Received basic.cancel', 0, 0)]) + if not self.channel_close_sent: + self.methods([BasicCancelOk(payload.consumer_tag), + ChannelClose(0, b'Received basic.cancel', 0, 0)]) + self.channel_close_sent = True self.cancelled = True # wasn't I? self.on_cancel() self.on_broker_cancel() @@ -259,7 +264,9 @@ class Consumer(Channeler): if isinstance(payload, BasicCancelOk): # OK, our cancelling went just fine - proceed with teardown self.register_on_close_watch() - self.method(ChannelClose(0, b'Received basic.cancel-ok', 0, 0)) + if not self.channel_close_sent: + self.method(ChannelClose(0, b'Received basic.cancel-ok', 0, 0)) + self.channel_close_sent = True return if isinstance(payload, ChannelClose): @@ -425,6 +432,12 @@ class Consumer(Channeler): self.state = ST_ONLINE + if self.cancelled: + self.method(ChannelClose(0, b'Received basic.cancel-ok', 0, 0)) + self.channel_close_sent = True + self.state = ST_OFFLINE + return + # resend QoS, in case of sth if self.qos is not None: self.set_qos(self.qos[0], self.qos[1]) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 9a508df0c2b63210733bd94af88a655e144c801f..f85e04e46508d15b37fc04434daa7cc505347b59 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -3,20 +3,22 @@ THE object you interface with """ from __future__ import print_function, absolute_import, division -import six + import logging -import warnings import time -import monotonic -from coolamqp.uplink import ListenerThread -from coolamqp.clustering.single import SingleNodeReconnector -from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer -from coolamqp.objects import Exchange -from coolamqp.exceptions import ConnectionDead +import warnings from concurrent.futures import Future +import monotonic +import six + +from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ NothingMuch +from coolamqp.clustering.single import SingleNodeReconnector +from coolamqp.exceptions import ConnectionDead +from coolamqp.objects import Exchange +from coolamqp.uplink import ListenerThread logger = logging.getLogger(__name__) @@ -38,7 +40,7 @@ class Cluster(object): ST_LINK_LOST = 0 # Link has been lost ST_LINK_REGAINED = 1 # Link has been regained - def __init__(self, nodes, on_fail=None, extra_properties=None): + def __init__(self, nodes, on_fail=None, extra_properties=None, log_frames=None): """ :param nodes: list of nodes, or a single node. For now, only one is supported. :type nodes: NodeDefinition instance or a list of NodeDefinition instances @@ -47,6 +49,9 @@ class Cluster(object): :type on_fail: callable/0 :param extra_properties: refer to documentation in [/coolamqp/connection/connection.py] Connection.__init__ + :param log_frames: an object that will have it's method .on_frame(timestamp, + frame, direction) called upon receiving/sending a frame. Timestamp is UNIX timestamp, + frame is AMQPFrame, direction is one of 'to_client', 'to_server' """ from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): @@ -57,6 +62,7 @@ class Cluster(object): self.node, = nodes self.extra_properties = extra_properties + self.log_frames = log_frames if on_fail is not None: def decorated(): @@ -170,7 +176,7 @@ class Cluster(object): raise NotImplementedError( u'Sorry, this functionality is not yet implemented!') - def start(self, wait=True, timeout=10.0): + def start(self, wait=True, timeout=10.0, log_frames=False): """ Connect to broker. Initialize Cluster. @@ -181,6 +187,7 @@ class Cluster(object): :param timeout: timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised :raise RuntimeError: called more than once :raise ConnectionDead: failed to connect within timeout + :param log_frames: whether to keep a log of sent/received frames in self.log_frames """ try: @@ -197,7 +204,8 @@ class Cluster(object): self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* self.snr = SingleNodeReconnector(self.node, self.attache_group, - self.listener, self.extra_properties) + self.listener, self.extra_properties, + self.log_frames) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) if self.on_fail is not None: self.snr.on_fail.add(self.on_fail) diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index 656b8869f5290805e36466074579a1a05df43a34..17663273bbc9aa70968b9cf67a2c0785914099b4 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -1,10 +1,10 @@ # coding=UTF-8 from __future__ import print_function, absolute_import, division -import six + import logging -from coolamqp.uplink import Connection from coolamqp.objects import Callable +from coolamqp.uplink import Connection logger = logging.getLogger(__name__) @@ -14,12 +14,14 @@ class SingleNodeReconnector(object): Connection to one node. It will do it's best to remain alive. """ - def __init__(self, node_def, attache_group, listener_thread, extra_properties=None): + def __init__(self, node_def, attache_group, listener_thread, extra_properties=None, + log_frames=None): self.listener_thread = listener_thread self.node_def = node_def self.attache_group = attache_group self.connection = None self.extra_properties = extra_properties + self.log_frames = log_frames self.terminating = False @@ -34,7 +36,9 @@ class SingleNodeReconnector(object): assert self.connection is None # Initiate connecting - this order is very important! - self.connection = Connection(self.node_def, self.listener_thread, self.extra_properties) + self.connection = Connection(self.node_def, self.listener_thread, + extra_properties=self.extra_properties, + log_frames=self.log_frames) self.attache_group.attach(self.connection) self.connection.start(timeout) self.connection.finalize.add(self.on_fail) diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index 89f057568963c32c49f0bc38a7b2f22fa54fcae5..6b224dcb73cf5069e632d59ceb45723502396088 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -95,6 +95,9 @@ class AMQPContentPropertyList(object): # todo they are immutable, so they could just serialize themselves... + def __str__(self): + return '<AMQPContentPropertyList>' + def get(self, property_name, default=None): """ Return a particular property, or default if not defined diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index d5b8736820f25977477bf2aa474a38085583d5b0..092f0b820cb7c7d652c21e6baed24a3260e1dee6 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -5,6 +5,7 @@ Concrete frame definitions from __future__ import absolute_import, division, print_function import struct + import six from coolamqp.framing.base import AMQPFrame @@ -24,6 +25,9 @@ class AMQPMethodFrame(AMQPFrame): AMQPFrame.__init__(self, channel) self.payload = payload + def __str__(self): + return 'AMQPMethodFrame(%s, %s)' % (self.channel, self.payload) + def write_to(self, buf): if self.payload.IS_CONTENT_STATIC: buf.write(struct.pack('!BH', FRAME_METHOD, self.channel)) @@ -91,6 +95,10 @@ class AMQPHeaderFrame(AMQPFrame): # frame header is always 7, frame end is 1, content header is 12 + props return 20 + self.properties.get_size() + def __str__(self): + return 'AMQPHeaderFrame(%s, %s, %s, %s, %s)' % (self.channel, self.class_id, self.weight, + self.body_size, self.properties) + class AMQPBodyFrame(AMQPFrame): FRAME_TYPE = FRAME_BODY @@ -118,6 +126,9 @@ class AMQPBodyFrame(AMQPFrame): def get_size(self): return 8 + len(self.data) + def __str__(self): + return '<AMQPBodyFrame of size %s>' % (len(self.data),) + class AMQPHeartbeatFrame(AMQPFrame): FRAME_TYPE = FRAME_HEARTBEAT @@ -132,3 +143,6 @@ class AMQPHeartbeatFrame(AMQPFrame): def get_size(self): return AMQPHeartbeatFrame.LENGTH + + def __str__(self): + return 'AMQPHeartbeatFrame()' diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 95c742b0aba2f3a097b49a19b9f365be72d49148..e12e3d2c208bb78d36bcc21dada245e26b9e5ef7 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -1,23 +1,24 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import logging + import collections -import monotonic -import uuid -import time +import logging import socket -import six +import time +import uuid + +import monotonic from coolamqp.exceptions import ConnectionDead +from coolamqp.framing.definitions import ConnectionClose, ConnectionCloseOk +from coolamqp.framing.frames import AMQPMethodFrame +from coolamqp.objects import Callable from coolamqp.uplink.connection.recv_framer import ReceivingFramer from coolamqp.uplink.connection.send_framer import SendingFramer -from coolamqp.framing.frames import AMQPMethodFrame -from coolamqp.uplink.handshake import Handshaker -from coolamqp.framing.definitions import ConnectionClose, ConnectionCloseOk -from coolamqp.uplink.connection.watches import MethodWatch, Watch from coolamqp.uplink.connection.states import ST_ONLINE, ST_OFFLINE, \ ST_CONNECTING -from coolamqp.objects import Callable +from coolamqp.uplink.connection.watches import MethodWatch +from coolamqp.uplink.handshake import Handshaker logger = logging.getLogger(__name__) @@ -78,7 +79,10 @@ class Connection(object): This logger is talkative mostly on INFO, and regarding connection state """ - def __init__(self, node_definition, listener_thread, extra_properties): + def __init__(self, node_definition, # type: coolamqp.objects.NodeDefinition + listener_thread, extra_properties, # type: tp.Dict[bytes, tp.Tuple[tp.Any, str]] + log_frames=None + ): """ Create an object that links to an AMQP broker. @@ -115,6 +119,9 @@ class Connection(object): self.listener_socket = None self.sendf = None + # To log frames + self.log_frames = log_frames + def call_on_connected(self, callable): """ Register a callable to be called when this links to the server. @@ -237,6 +244,10 @@ class Connection(object): :param frames: list of frames or None to close the link :param reason: optional human-readable reason for this action """ + if self.log_frames is not None: + for frame in frames: + self.log_frames.on_frame(time.monotonic(), frame, 'to_server') + if frames is not None: # for frame in frames: # if isinstance(frame, AMQPMethodFrame): @@ -257,6 +268,9 @@ class Connection(object): :param frame: AMQPFrame that was received """ + if self.log_frames is not None: + self.log_frames.on_frame(time.monotonic(), frame, 'to_client') + watch_handled = False # True if ANY watch handled this if isinstance(frame, AMQPMethodFrame): diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index a40874e5520a9ec2386a1e0824cc101f383ac0d8..d207fa4bfd1e0449d596c2a507922395bec9f714 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, division, print_function Provides reactors that can authenticate an AQMP session """ import six +import typing as tp import copy from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk @@ -24,7 +25,7 @@ CLIENT_DATA = [ # because RabbitMQ is some kind of a fascist and does not allow # these fields to be of type short-string (b'product', (b'CoolAMQP', 'S')), - (b'version', (b'0.97a1', 'S')), + (b'version', (b'0.98', 'S')), (b'copyright', (b'Copyright (C) 2016-2019 SMOK sp. z o.o.', 'S')), ( b'information', ( @@ -42,7 +43,11 @@ class Handshaker(object): Object that given a connection rolls the handshake. """ - def __init__(self, connection, node_definition, on_success, extra_properties=None): + def __init__(self, connection, # type: coolamqp.uplink.connection.Connection + node_definition, # type: coolamqp.objects.NodeDefinition + on_success, # type: tp.Callable[[], None] + extra_properties=None # type: tp.Dict[bytes, tp.Tuple[tp.Any, str]] + ): """ :param connection: Connection instance to use :type node_definition: NodeDefinition @@ -72,7 +77,8 @@ class Handshaker(object): # closing the connection this way will get to Connection by channels of ListenerThread self.connection.send(None) - def on_connection_start(self, payload): + def on_connection_start(self, payload # type: coolamqp.framing.base.AMQPPayload + ): sasl_mechanisms = payload.mechanisms.tobytes().split(b' ') locale_supported = payload.locales.tobytes().split(b' ') @@ -103,7 +109,8 @@ class Handshaker(object): )) ]) - def on_connection_tune(self, payload): + def on_connection_tune(self, payload # type: coolamqp.framing.base.AMQPPayload + ): self.connection.frame_max = payload.frame_max self.connection.heartbeat = min(payload.heartbeat, self.heartbeat) for channel in six.moves.xrange(1, ( @@ -124,5 +131,6 @@ class Handshaker(object): from coolamqp.uplink.heartbeat import Heartbeater Heartbeater(self.connection, self.connection.heartbeat) - def on_connection_open_ok(self, payload): + def on_connection_open_ok(self, payload # type: coolamqp.framing.base.AMQPPayload + ): self.on_success() diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py index 66b0b7b591da8eeaa39e92b9c5053a4718858312..fcc3bae6cb95ae7eae415e733cf8eff0d56d1f98 100644 --- a/coolamqp/uplink/heartbeat.py +++ b/coolamqp/uplink/heartbeat.py @@ -1,6 +1,8 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function +import typing as tp + import monotonic from coolamqp.framing.frames import AMQPHeartbeatFrame @@ -12,7 +14,9 @@ class Heartbeater(object): An object that handles heartbeats """ - def __init__(self, connection, heartbeat_interval=0): + def __init__(self, connection, # type: coolamqp.uplink.connection.Connection + heartbeat_interval=0 # type: tp.Union[int, float] + ): self.connection = connection self.heartbeat_interval = heartbeat_interval @@ -44,7 +48,7 @@ class Heartbeater(object): self.connection.send([AMQPHeartbeatFrame()], priority=True) if ( - monotonic.monotonic() - self.last_heartbeat_on) > 2 * self.heartbeat_interval: + monotonic.monotonic() - self.last_heartbeat_on) > 2 * self.heartbeat_interval: # closing because of heartbeat self.connection.send(None) diff --git a/setup.cfg b/setup.cfg index 4553be3f616520757b2fb049167f0a1eb8b29cf9..ec4cc06fcccd5f80ff91eb697913dd83b9ad8b5d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] description-file = README.md name = CoolAMQP -version = 0.97a2 +version = 0.98 license = MIT License classifiers = Programming Language :: Python diff --git a/stress_tests/README.md b/stress_tests/README.md new file mode 100644 index 0000000000000000000000000000000000000000..a1305012ad322545b4456793c98878dba09ef518 --- /dev/null +++ b/stress_tests/README.md @@ -0,0 +1,6 @@ +Stress tests for CoolAMQP +========================= + +This module consistutes a stress test for CoolAMQP. + +To be ran in Travis or your local Vagrant. diff --git a/stress_tests/__init__.py b/stress_tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/stress_tests/__main__.py b/stress_tests/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..adba2f12c89131f65889ad65d2addb8895acf7bf --- /dev/null +++ b/stress_tests/__main__.py @@ -0,0 +1,52 @@ +import logging +import multiprocessing +import sys +import time + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) + +if __name__ == '__main__': + if sys.version.startswith('2.7'): + logger.critical('This will not run on Python 2.x1') + sys.exit(0) + + from queue import Empty + + notify_client, result_client, notify_server, result_server = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue() + + from .client import run as run_client + from .server import run as run_server + + server = multiprocessing.Process(target=run_server, args=( + notify_client, result_client, notify_server, result_server)) + client = multiprocessing.Process(target=run_client, args=( + notify_client, result_client, notify_server, result_server)) + + server.start() + client.start() + + try: + time.sleep(40) + except KeyboardInterrupt: + pass + + notify_client.put(None) + notify_server.put(None) + + server.join() + client.join() + + try: + obj = result_server.get(timeout=1.0) + if obj == 'fail': + sys.exit(1) + except Empty: + pass + + try: + obj = result_client.get(timeout=1.0) + if obj == 'fail': + sys.exit(1) + except Empty: + pass diff --git a/stress_tests/client/__init__.py b/stress_tests/client/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..5fc64fd93986477ea81cb9a55474a75fceb21ad3 --- /dev/null +++ b/stress_tests/client/__init__.py @@ -0,0 +1,134 @@ +import logging +import random +import time +import typing as tp +import uuid +from collections import deque +from queue import Empty + +from satella.coding.concurrent import TerminableThread + +from coolamqp.clustering.events import ReceivedMessage, NothingMuch +from coolamqp.objects import Queue, Message +from ..settings import connect, queue_names, LogFramesToFile + +logger = logging.getLogger(__name__) + +MESSAGES_PER_SECOND_PER_CONNECTION = 0.5 +CONNECTIONS_PER_SECOND = 0.9 +DISCONNECTS_PER_SECOND_PER_CONNECTION = 0.1 +ANSWER_PROBABILITY = 0.7 + + +def run_multiple_if_probability(probability: float, callable: tp.Callable[[], None]) -> None: + prob = random.random() + while prob < probability: + try: + callable() + except Exception as e: + return + prob = random.random() + + +def run_if_probability(probability: float, callable: tp.Callable[[], None]) -> None: + if random.random() < probability: + try: + return callable() + except Exception as e: + pass + + +class Connection: + """ + This binds to queues called forth in settings and pushes messages to this name + "-repl" + """ + + CONNECTION_COUNTER = 0 + + def __init__(self, cad_thread: 'ConnectAndDisconnectThread'): + """:raises ValueError: not more free names available""" + self.cad_thread = cad_thread + try: + self.name = self.cad_thread.free_names.popleft() + except IndexError: + logger.warning('Ran out of free names') + raise ValueError('Cannot create a connection %s - ran out of free names', + Connection.CONNECTION_COUNTER) + self.consumer, future = cad_thread.amqp.consume(Queue(self.name)) + self.terminated = False + Connection.CONNECTION_COUNTER += 1 + cad_thread.connections[self.name] = self + + def cancel(self): + self.consumer.cancel() + self.cad_thread.free_names.append(self.name) + self.terminated = True + + def process(self): + if not self.terminated: + run_if_probability(MESSAGES_PER_SECOND_PER_CONNECTION, self._send) + run_if_probability(DISCONNECTS_PER_SECOND_PER_CONNECTION, self.cancel) + + def _send(self): + self.cad_thread.amqp.publish(Message(uuid.uuid4().hex.encode('utf8')), + routing_key=self.name + '-repl') + + def on_message(self, msg: ReceivedMessage): + run_if_probability(ANSWER_PROBABILITY, self._send) + + +class ConnectAndDisconnectThread(TerminableThread): + def __init__(self, amqp): + self.amqp = amqp + super().__init__() + self.free_names = deque(queue_names) + self.connections = {} # type: tp.Dict[str, Connection] + + def loop(self) -> None: + started_at = time.monotonic() + run_multiple_if_probability(CONNECTIONS_PER_SECOND, lambda: Connection(self)) + + for connection in self.connections.values(): + connection.process() + + self.connections = {name: connection for name, connection in self.connections.items() if + not connection.terminated} + + evt = None + while not isinstance(evt, NothingMuch): + evt = self.amqp.drain(max(0.0, 1 - (time.monotonic() - started_at))) + + if isinstance(evt, ReceivedMessage): + routing_key = evt.routing_key.tobytes().decode('utf8') + if routing_key in self.connections: + self.connections[routing_key].on_message(evt) + if evt.ack is not None: + evt.ack() + + time.sleep(max(0.0, 1 - (time.monotonic() - started_at))) + + +def run(client_notify, result_client, server_notify, server_result): + logging.basicConfig(level=logging.DEBUG) + + lftf = LogFramesToFile('client.txt') + amqp = connect(on_fail=result_client, log_frames=lftf) + cad = ConnectAndDisconnectThread(amqp) + + server_notify.put(None) + client_notify.get() + + cad.start() + + terminating = False + while not terminating: + try: + client_notify.get(timeout=1.0) + terminating = True + except Empty: + time.sleep(1) + except KeyboardInterrupt: + break + + lftf.close() + # logger.warning('Got %s connections', len(cad.connections)) diff --git a/stress_tests/requirements.txt b/stress_tests/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..35cb25ca33709b70f2fce062592a7905e4da8cd5 --- /dev/null +++ b/stress_tests/requirements.txt @@ -0,0 +1 @@ +satella diff --git a/stress_tests/server/__init__.py b/stress_tests/server/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ff7c03afe4dcd3d1ae26c7519d299b498b5882a6 --- /dev/null +++ b/stress_tests/server/__init__.py @@ -0,0 +1,48 @@ +import logging + +from coolamqp.clustering.events import ReceivedMessage +from coolamqp.objects import Queue, Message + +logger = logging.getLogger(__name__) +from satella.coding.concurrent import TerminableThread +from ..settings import queue_names, connect, LogFramesToFile + + +class Server(TerminableThread): + def __init__(self, amqp): + self.amqp = amqp + super().__init__() + self.consumers = [] + for queue_name in queue_names: + cons, fut = self.amqp.consume(Queue(queue_name + '-repl')) + self.consumers.append(cons) + + def loop(self): + evt = self.amqp.drain(timeout=1.0) + if isinstance(evt, ReceivedMessage): + routing_key = evt.routing_key.tobytes().decode('utf8') + routing_key = routing_key.replace('-repl', '') + self.amqp.publish(Message(evt.body), routing_key=routing_key) + + +def run(notify_client, result_client, notify_server, server_result): + logging.basicConfig(level=logging.DEBUG) + + lftf = LogFramesToFile('server.txt') + + amqp = connect(on_fail=server_result, log_frames=lftf) + server = Server(amqp) + + notify_client.put(None) + notify_server.get() + + server.start() + + try: + notify_server.get() + except KeyboardInterrupt: + pass + + server.terminate().join() + + lftf.close() diff --git a/stress_tests/settings.py b/stress_tests/settings.py new file mode 100644 index 0000000000000000000000000000000000000000..aa36066b609da40f1df284de4cb0b331ac2896c1 --- /dev/null +++ b/stress_tests/settings.py @@ -0,0 +1,33 @@ +import logging + +from coolamqp.clustering import Cluster +from coolamqp.objects import NodeDefinition + +logger = logging.getLogger(__name__) + +NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20) +logging.basicConfig(level=logging.DEBUG) + + +def connect(on_fail=lambda: None, log_frames=None): + def _on_fail(): + on_fail.put('fail') + + amqp = Cluster([NODE], on_fail=_on_fail, log_frames=log_frames) + amqp.start(wait=True) + return amqp + + +class LogFramesToFile: + def __init__(self, path): + self.file = open(path, 'w') + + def close(self): + self.file.close() + + def on_frame(self, timestamp, frame, direction): + self.file.write('%s %s %s\n' % (timestamp, frame, direction)) + self.file.flush() + + +queue_names = (str(v) for v in range(100))