Skip to content
Snippets Groups Projects
Commit 6277d852 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

orders will be finished upon terminating

parent 22611f5b
No related branches found
No related tags found
No related merge requests found
...@@ -84,60 +84,63 @@ class ClusterHandlerThread(threading.Thread): ...@@ -84,60 +84,63 @@ class ClusterHandlerThread(threading.Thread):
break # we connected :) break # we connected :)
def perform_order(self):
order = self.order_queue.popleft()
try:
if isinstance(order, SendMessage):
self.backend.basic_publish(order.message, order.exchange, order.routing_key)
elif isinstance(order, DeclareExchange):
self.backend.exchange_declare(order.exchange)
self.declared_exchanges[order.exchange.name] = order.exchange
elif isinstance(order, DeleteExchange):
self.backend.exchange_delete(order.exchange)
if order.exchange.name in self.declared_exchanges:
del self.declared_exchanges[order.exchange.name]
elif isinstance(order, DeleteQueue):
self.backend.queue_delete(order.queue)
elif isinstance(order, ConsumeQueue):
self.backend.queue_declare(order.queue)
if order.queue.exchange is not None:
if isinstance(order.queue.exchange, Exchange):
self.backend.queue_bind(order.queue, order.queue.exchange)
else:
for exchange in order.queue.exchange:
self.backend.queue_bind(order.queue, order.queue.exchange)
self.backend.basic_consume(order.queue)
self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue
elif isinstance(order, CancelQueue):
try:
q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag)
except KeyError:
pass # wat?
else:
self.backend.basic_cancel(order.queue.consumer_tag)
self.event_queue.put(ConsumerCancelled(order.queue))
elif isinstance(order, AcknowledgeMessage):
if order.connect_id == self.connect_id:
self.backend.basic_ack(order.delivery_tag)
elif isinstance(order, NAcknowledgeMessage):
if order.connect_id == self.connect_id:
self.backend.basic_nack(order.delivery_tag)
except RemoteAMQPError as e:
logger.error('Remote AMQP error: %s', e)
order.failed(e) # we are allowed to go on
except ConnectionFailedError:
self.order_queue.appendleft(order)
raise
else:
order.completed()
def run(self): def run(self):
self._reconnect() self._reconnect()
while not self.is_terminating: while (not self.is_terminating) or len(self.order_queue) > 0:
try: try:
while len(self.order_queue) > 0: while len(self.order_queue) > 0:
order = self.order_queue.popleft() self.perform_order()
try:
if isinstance(order, SendMessage):
self.backend.basic_publish(order.message, order.exchange, order.routing_key)
elif isinstance(order, DeclareExchange):
self.backend.exchange_declare(order.exchange)
self.declared_exchanges[order.exchange.name] = order.exchange
elif isinstance(order, DeleteExchange):
self.backend.exchange_delete(order.exchange)
if order.exchange.name in self.declared_exchanges:
del self.declared_exchanges[order.exchange.name]
elif isinstance(order, DeleteQueue):
self.backend.queue_delete(order.queue)
elif isinstance(order, ConsumeQueue):
self.backend.queue_declare(order.queue)
if order.queue.exchange is not None:
if isinstance(order.queue.exchange, Exchange):
self.backend.queue_bind(order.queue, order.queue.exchange)
else:
for exchange in order.queue.exchange:
self.backend.queue_bind(order.queue, order.queue.exchange)
self.backend.basic_consume(order.queue)
self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue
elif isinstance(order, CancelQueue):
try:
q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag)
except KeyError:
pass # wat?
else:
self.backend.basic_cancel(order.queue.consumer_tag)
self.event_queue.put(ConsumerCancelled(order.queue))
elif isinstance(order, AcknowledgeMessage):
if order.connect_id == self.connect_id:
self.backend.basic_ack(order.delivery_tag)
elif isinstance(order, NAcknowledgeMessage):
if order.connect_id == self.connect_id:
self.backend.basic_nack(order.delivery_tag)
except RemoteAMQPError as e:
logger.error('Remote AMQP error: %s', e)
order.failed(e) # we are allowed to go on
except ConnectionFailedError:
self.order_queue.appendleft(order)
raise
else:
order.completed()
# just drain shit # just drain shit
self.backend.process(max_time=2) self.backend.process(max_time=2)
...@@ -148,7 +151,6 @@ class ClusterHandlerThread(threading.Thread): ...@@ -148,7 +151,6 @@ class ClusterHandlerThread(threading.Thread):
self.event_queue.put(ConnectionDown()) self.event_queue.put(ConnectionDown())
self._reconnect() self._reconnect()
def terminate(self): def terminate(self):
""" """
Called by Cluster. Tells to finish all jobs and quit. Called by Cluster. Tells to finish all jobs and quit.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment