diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 4bd88f5c11964ff709f4443c22d9554953497bf8..15d0ec517844d8a1099ab434c570adb6edc29390 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -84,60 +84,63 @@ class ClusterHandlerThread(threading.Thread): break # we connected :) + def perform_order(self): + order = self.order_queue.popleft() + + try: + if isinstance(order, SendMessage): + self.backend.basic_publish(order.message, order.exchange, order.routing_key) + elif isinstance(order, DeclareExchange): + self.backend.exchange_declare(order.exchange) + self.declared_exchanges[order.exchange.name] = order.exchange + elif isinstance(order, DeleteExchange): + self.backend.exchange_delete(order.exchange) + if order.exchange.name in self.declared_exchanges: + del self.declared_exchanges[order.exchange.name] + elif isinstance(order, DeleteQueue): + self.backend.queue_delete(order.queue) + elif isinstance(order, ConsumeQueue): + self.backend.queue_declare(order.queue) + + if order.queue.exchange is not None: + if isinstance(order.queue.exchange, Exchange): + self.backend.queue_bind(order.queue, order.queue.exchange) + else: + for exchange in order.queue.exchange: + self.backend.queue_bind(order.queue, order.queue.exchange) + + self.backend.basic_consume(order.queue) + self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue + elif isinstance(order, CancelQueue): + try: + q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag) + except KeyError: + pass # wat? + else: + self.backend.basic_cancel(order.queue.consumer_tag) + self.event_queue.put(ConsumerCancelled(order.queue)) + elif isinstance(order, AcknowledgeMessage): + if order.connect_id == self.connect_id: + self.backend.basic_ack(order.delivery_tag) + elif isinstance(order, NAcknowledgeMessage): + if order.connect_id == self.connect_id: + self.backend.basic_nack(order.delivery_tag) + except RemoteAMQPError as e: + logger.error('Remote AMQP error: %s', e) + order.failed(e) # we are allowed to go on + except ConnectionFailedError: + self.order_queue.appendleft(order) + raise + else: + order.completed() + def run(self): self._reconnect() - while not self.is_terminating: + while (not self.is_terminating) or len(self.order_queue) > 0: try: while len(self.order_queue) > 0: - order = self.order_queue.popleft() - - try: - if isinstance(order, SendMessage): - self.backend.basic_publish(order.message, order.exchange, order.routing_key) - elif isinstance(order, DeclareExchange): - self.backend.exchange_declare(order.exchange) - self.declared_exchanges[order.exchange.name] = order.exchange - elif isinstance(order, DeleteExchange): - self.backend.exchange_delete(order.exchange) - if order.exchange.name in self.declared_exchanges: - del self.declared_exchanges[order.exchange.name] - elif isinstance(order, DeleteQueue): - self.backend.queue_delete(order.queue) - elif isinstance(order, ConsumeQueue): - self.backend.queue_declare(order.queue) - - if order.queue.exchange is not None: - if isinstance(order.queue.exchange, Exchange): - self.backend.queue_bind(order.queue, order.queue.exchange) - else: - for exchange in order.queue.exchange: - self.backend.queue_bind(order.queue, order.queue.exchange) - - self.backend.basic_consume(order.queue) - self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue - elif isinstance(order, CancelQueue): - try: - q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag) - except KeyError: - pass # wat? - else: - self.backend.basic_cancel(order.queue.consumer_tag) - self.event_queue.put(ConsumerCancelled(order.queue)) - elif isinstance(order, AcknowledgeMessage): - if order.connect_id == self.connect_id: - self.backend.basic_ack(order.delivery_tag) - elif isinstance(order, NAcknowledgeMessage): - if order.connect_id == self.connect_id: - self.backend.basic_nack(order.delivery_tag) - except RemoteAMQPError as e: - logger.error('Remote AMQP error: %s', e) - order.failed(e) # we are allowed to go on - except ConnectionFailedError: - self.order_queue.appendleft(order) - raise - else: - order.completed() + self.perform_order() # just drain shit self.backend.process(max_time=2) @@ -148,7 +151,6 @@ class ClusterHandlerThread(threading.Thread): self.event_queue.put(ConnectionDown()) self._reconnect() - def terminate(self): """ Called by Cluster. Tells to finish all jobs and quit.