-
Piotr Maślanka authoredUnverified0b7f2380
client.py 4.41 KiB
import logging
import random
import time
import typing as tp
import uuid
from collections import deque
from queue import Empty
from satella.coding.concurrent import TerminableThread, ThreadCollection
from coolamqp.clustering.events import ReceivedMessage, NothingMuch
from coolamqp.objects import Queue, Message
from .settings import connect, queue_names, LogFramesToFile
logger = logging.getLogger(__name__)
MESSAGES_PER_SECOND_PER_CONNECTION = 0.5
CONNECTIONS_PER_SECOND = 0.9
DISCONNECTS_PER_SECOND_PER_CONNECTION = 0.1
ANSWER_PROBABILITY = 0.7
RUNNING_INTERVAL = 120 # run for 120 seconds
def run_multiple_if_probability(probability: float, callable: tp.Callable[[], None]) -> None:
prob = random.random()
while prob < probability:
try:
callable()
except Exception as e:
return
prob = random.random()
def run_if_probability(probability: float, callable: tp.Callable[[], None]) -> None:
if random.random() < probability:
try:
return callable()
except Exception as e:
pass
class Connection:
"""
This binds to queues called forth in settings and pushes messages to this name + "-repl"
"""
CONNECTION_COUNTER = 0
def __init__(self, cad_thread: 'ConnectAndDisconnectThread'):
""":raises ValueError: not more free names available"""
self.cad_thread = cad_thread
try:
self.name = self.cad_thread.free_names.popleft()
except IndexError:
raise ValueError('Cannot create a connection %s - ran out of free names',
Connection.CONNECTION_COUNTER)
self.consumer, future = cad_thread.amqp.consume(Queue(self.name))
self.terminated = False
Connection.CONNECTION_COUNTER += 1
cad_thread.connections[self.name] = self
def cancel(self):
self.consumer.cancel()
self.cad_thread.free_names.append(self.name)
self.terminated = True
def process(self):
if not self.terminated:
run_if_probability(MESSAGES_PER_SECOND_PER_CONNECTION, self._send)
run_if_probability(DISCONNECTS_PER_SECOND_PER_CONNECTION, self.cancel)
def _send(self):
self.cad_thread.amqp.publish(Message(uuid.uuid4().hex.encode('utf8')),
routing_key=self.name + '-repl')
def on_message(self, msg: ReceivedMessage):
run_if_probability(ANSWER_PROBABILITY, self._send)
class ConnectAndDisconnectThread(TerminableThread):
def __init__(self, amqp):
self.amqp = amqp
super().__init__()
self.free_names = deque(queue_names)
self.connections = {} # type: tp.Dict[str, Connection]
def loop(self) -> None:
started_at = time.monotonic()
run_multiple_if_probability(CONNECTIONS_PER_SECOND, lambda: Connection(self))
for connection in self.connections.values():
connection.process()
self.connections = {name: connection for name, connection in self.connections.items() if
not connection.terminated}
evt = None
while not isinstance(evt, NothingMuch):
evt = self.amqp.drain(max(0.0, 1 - (time.monotonic() - started_at)))
if isinstance(evt, ReceivedMessage):
routing_key = evt.routing_key.tobytes().decode('utf8')
if routing_key in self.connections:
self.connections[routing_key].on_message(evt)
if evt.ack is not None:
evt.ack()
time.sleep(max(0.0, 1 - (time.monotonic() - started_at)))
def run(client_notify, result_client, server_notify, server_result):
logging.basicConfig(level=logging.WARNING)
lftf = LogFramesToFile('client.txt')
amqp = connect(on_fail=result_client, log_frames=lftf)
tc = ThreadCollection()
for i in range(3):
cad = ConnectAndDisconnectThread(amqp)
tc.add(cad)
tc.start()
started_at = time.monotonic()
terminating = False
while not terminating and (time.monotonic() < started_at + RUNNING_INTERVAL): # run for however long is required
try:
client_notify.get(timeout=1.0)
terminating = True
except Empty:
time.sleep(1)
except KeyboardInterrupt:
break
tc.terminate().join()
server_notify.put(None)
lftf.close()
# logger.warning('Got %s connections', len(cad.connections))