diff --git a/coolamqp/attaches/agroup.py b/coolamqp/attaches/agroup.py index 05a43b95db422440c2fb5c19fff033df7a9370bc..0eaee7d6b176282154bba977660a0d3ffeb6785f 100644 --- a/coolamqp/attaches/agroup.py +++ b/coolamqp/attaches/agroup.py @@ -64,7 +64,10 @@ class AttacheGroup(Attache): for attache in self.attaches: if not attache.cancelled: - if attache.connection != connection: - attache.attach(connection) + print('Attaching', attache) + attache.attach(connection) + else: + print('lol wut') + raise Exception diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 405ab75eb589c25767fce2ff318f6ba0b7d70947..f0997514b5077d45ff28e072ad90a795eac992a3 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -73,6 +73,7 @@ class Channeler(Attache): :param connection: Connection instance to use """ super(Channeler, self).attach(connection) + assert self.connection is not None connection.call_on_connected(self.on_uplink_established) # ------- event handlers @@ -178,6 +179,7 @@ class Channeler(Attache): def on_uplink_established(self): """Called by connection. Connection reports being ready to do things.""" + assert self.connection is not None self.state = ST_SYNCING self.channel_id = self.connection.free_channels.pop() diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 10e93c9d4a60319e441fdffa381237a805d46648..217693d6309f9a4f0afb074b86dab779f1addfe1 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -12,6 +12,7 @@ from __future__ import absolute_import, division, print_function import collections import logging +import struct import warnings from coolamqp.framing.definitions import ChannelOpenOk, BasicPublish, Basic, BasicAck @@ -25,7 +26,7 @@ except ImportError: from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch, FailWatch -from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable +from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, Synchronized from coolamqp.objects import Future @@ -37,7 +38,7 @@ CnpubMessageSendOrder = collections.namedtuple('CnpubMessageSendOrder', ('messag 'routing_key', 'future')) -class Publisher(Channeler): +class Publisher(Channeler, Synchronized): """ An object that is capable of sucking into a Connection and sending messages. Depending on it's characteristic, it may process messages in: @@ -51,6 +52,10 @@ class Publisher(Channeler): and emit a warning. Other modes may be added in the future. + + Since this may be called by other threads than ListenerThread, this has locking. + + _pub and on_fail are synchronized so that _pub doesn't see a partially destroyed class. """ MODE_NOACK = 0 # no-ack publishing MODE_CNPUB = 1 # RabbitMQ publisher confirms extension @@ -66,7 +71,9 @@ class Publisher(Channeler): :type mode: MODE_NOACK or MODE_CNPUB :raise ValueError: mode invalid """ - super(Publisher, self).__init__() + Channeler.__init__(self) + Synchronized.__init__(self) + if mode not in (Publisher.MODE_NOACK, Publisher.MODE_CNPUB): raise ValueError(u'Invalid publisher mode') @@ -82,16 +89,22 @@ class Publisher(Channeler): super(Publisher, self).attach(connection) connection.watch(FailWatch(self.on_fail)) + @Synchronized.synchronized def on_fail(self): """ Registered as a fail watch for connection """ self.state = ST_OFFLINE self.connection = None + print('Publisher is FAILED') + @Synchronized.synchronized def _pub(self, message, exchange_name, routing_key): """ - Just send the message. Sends BasicDeliver + header + body + Just send the message. Sends BasicDeliver + header + body. + + BECAUSE OF publish THIS CAN GET CALLED BY FOREIGN THREAD. + :param message: Message instance :param exchange_name: exchange to use :param routing_key: routing key to use diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py index 5ea2981eebdc8c6865b9d311506f5a4e893b0016..9989ca77f47edc846386c44312849dbd9348372c 100644 --- a/coolamqp/attaches/utils.py +++ b/coolamqp/attaches/utils.py @@ -3,6 +3,7 @@ from __future__ import print_function, absolute_import, division import six import logging import threading +import functools logger = logging.getLogger(__name__) @@ -203,3 +204,30 @@ class AtomicTagger(object): with self.lock: self.next_tag += 1 return self.next_tag - 1 + + +class Synchronized(object): + """ + I have a lock and can sync on it. Use like: + + class Synced(Synchronized): + + @synchronized + def mandatorily_a_instance_method(self, ...): + ... + + """ + + def __init__(self): + self._monitor_lock = threading.Lock() + + @staticmethod + def synchronized(fun): + @functools.wraps(fun) + def monitored(*args, **kwargs): + with args[0]._monitor_lock: + return fun(*args, **kwargs) + + return monitored + + diff --git a/coolamqp/persistence/__init__.py b/coolamqp/persistence/__init__.py index c26370fd050633248a085b288d3ce07f44823179..f4baa6193a4cd21cad73ff3680c50a3b27ddc64c 100644 --- a/coolamqp/persistence/__init__.py +++ b/coolamqp/persistence/__init__.py @@ -10,7 +10,7 @@ from __future__ import print_function, absolute_import, division import six import logging -from coolamqp.uplink import FailWatch, Connection +from coolamqp.uplink import Connection logger = logging.getLogger(__name__) @@ -31,13 +31,13 @@ class SingleNodeReconnector(object): def connect(self): assert self.connection is None - # Initiate connecting + # Initiate connecting - this order is very important! self.connection = Connection(self.node_def, self.listener_thread) - self.connection.start() - self.connection.watch(FailWatch(self.on_fail)) self.attache_group.attach(self.connection) + self.connection.start() + self.connection.add_finalizer(self.on_fail) def on_fail(self): - logger.info('Reconnecting...') + print('I am failed, but will recover!') self.connection = None self.connect() diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 79c8d831ca9bb4e5b4b2f2a51fbe3c207f803287..965e1ca5c60640e124a698bdf75442e939667029 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -2,6 +2,7 @@ from __future__ import absolute_import, division, print_function import logging import collections +import time import socket import six @@ -52,6 +53,9 @@ class Connection(object): self.watches = {} # channel => list of [Watch instance] self.any_watches = [] # list of Watches that should check everything + self.finalizers = [] + + self.state = ST_CONNECTING self.callables_on_connected = [] # list of callable/0 @@ -87,10 +91,21 @@ class Connection(object): """ Start processing events for this connect. Create the socket, transmit 'AMQP\x00\x00\x09\x01' and roll. + + Warning: This will block for as long as the TCP connection setup takes. """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((self.node_definition.host, self.node_definition.port)) + + while True: + try: + sock.connect((self.node_definition.host, self.node_definition.port)) + except socket.error as e: + print(e) + time.sleep(0.5) # Connection refused? Very bad things? + else: + break + sock.settimeout(0) sock.send('AMQP\x00\x00\x09\x01') @@ -101,6 +116,19 @@ class Connection(object): self.sendf = SendingFramer(self.listener_socket.send) self.watch_for_method(0, (ConnectionClose, ConnectionCloseOk), self.on_connection_close) + def add_finalizer(self, callable): + """ + Add a callable to be executed when all watches were failed and we're really going down. + + Finalizers are not used for logic stuff, but for situations like making TCP reconnects. + When we are making a reconnect, we need to be sure that all watches fired - so logic is intact. + + DO NOT PUT CALLABLES THAT HAVE TO DO WITH STATE THINGS, ESPECIALLY ATTACHES. + + :param callable: callable/0 + """ + self.finalizers.append(callable) + def on_fail(self): """ Called by event loop when the underlying connection is closed. @@ -128,12 +156,17 @@ class Connection(object): self.watches = {} # Clear the watch list self.any_watches = [] + # call finalizers + for finalizer in self.finalizers: + finalizer() + def on_connection_close(self, payload): """ Server attempted to close the connection.. or maybe we did? Called by ListenerThread. """ + print('We are GOING DOOOWN') self.on_fail() # it does not make sense to prolong the agony if isinstance(payload, ConnectionClose): @@ -153,11 +186,6 @@ class Connection(object): :param reason: optional human-readable reason for this action """ if frames is not None: - for frame in frames: - if isinstance(frame, AMQPMethodFrame): - print('Sending ', frame.payload) - else: - print('Sending ', frame) self.sendf.send(frames, priority=priority) else: # Listener socket will kill us when time is right diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index 1160fe36366d52ff088051eb9d45f805e94a706f..10026c88df42b88f4a201db3fba7ecb15b924fbd 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -56,7 +56,7 @@ class AnyWatch(Watch): as it wants. """ def __init__(self, callable): - Watch.__init__(self, None, False) + super(AnyWatch, self).__init__(None, False) self.callable = callable def is_triggered_by(self, frame): @@ -69,16 +69,14 @@ class FailWatch(Watch): A special kind of watch that fires when connection has died """ def __init__(self, callable): - Watch.__init__(self, None, True) + super(FailWatch, self).__init__(None, True) self.callable = callable def is_triggered_by(self, frame): return False - def fire(self): - """ - Connection failed! - """ + def failed(self): + """Connection failed!""" self.callable() diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py index bb17fc0dff5cc5c1317c39eed91b47328382c453..3f37ba89a3b1fedc33de7f977b8bed40bf8fde58 100644 --- a/coolamqp/uplink/heartbeat.py +++ b/coolamqp/uplink/heartbeat.py @@ -21,7 +21,6 @@ class Heartbeater(object): self.connection.watch(AnyWatch(self.on_heartbeat)) def on_heartbeat(self, frame): - print('Heart Beat!') self.last_heartbeat_on = monotonic.monotonic() def on_any_frame(self): @@ -42,11 +41,9 @@ class Heartbeater(object): def on_timer(self): """Timer says we should send a heartbeat""" self.connection.send([AMQPHeartbeatFrame()], priority=True) - print('Timer') if (monotonic.monotonic() - self.last_heartbeat_on) > 2*self.heartbeat_interval: # closing because of heartbeat - print('TERMINATING BECAUSE NO HEARTBEAT!!!!') self.connection.send(None) self.connection.watchdog(self.heartbeat_interval, self.on_timer) diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 6e2ccf3a011d6b5a84fba6c347e6f9137a744d16..a27095e0e1cbc46535385b1970afd9e1c29c5f58 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -28,8 +28,15 @@ class EpollSocket(BaseSocket): self.priority_queue = collections.deque() def send(self, data, priority=False): + """ + This can actually get called not by ListenerThread. + """ BaseSocket.send(self, data, priority=priority) - self.listener.epoll.modify(self, RW) + try: + self.listener.epoll.modify(self, RW) + except socket.error: + # silence. If there are errors, it's gonna get nuked soon. + pass def oneshot(self, seconds_after, callable): """ diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 5a1fa4575a46e614737a1a559acb5bb0c9d1b97d..ffd56372ee8eddb44ad45b938c227060f819de42 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -54,6 +54,13 @@ class BaseSocket(object): :param priority: preempt other datas. Property of sending data atomically will be maintained. """ if self.is_failed: return + + if data is None: + # THE POPE OF NOPE + self.priority_queue = collections.deque() + self.data_to_send = collections.deque([None]) + return + if priority: self.priority_queue.append(data) else: diff --git a/tests/run.py b/tests/run.py index 994b4b390292ac4078dcb6697e6be4f3e64df07e..ae3171b5a6cc18b08e8c4678f6186fec0a974a6c 100644 --- a/tests/run.py +++ b/tests/run.py @@ -36,6 +36,7 @@ if __name__ == '__main__': while True: pub2.publish(Message(b'you dawg', properties=MessageProperties(content_type='text/plain')), routing_key=b'siema-eniu') + time.sleep(0.1) IPublishThread(ag).start()