# coding=UTF-8 from __future__ import print_function, absolute_import, division import logging import os import time import unittest from coolamqp.clustering import Cluster from coolamqp.exceptions import ConnectionDead from coolamqp.objects import NodeDefinition, Queue, Exchange, Message NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) logging.getLogger('coolamqp').setLevel(logging.DEBUG) logger = logging.getLogger(__name__) class TestConnecting(unittest.TestCase): def test_argumented_exchange(self): c = Cluster(NODE) c.start(wait=True, timeout=None) if c.properties.version[0] < 4: c.shutdown(wait=True) return stream = Queue('my-stream', durable=True, auto_delete=False, exclusive=False, arguments={'x-queue-type': 'stream'}) c.declare(stream) for i in range(10): c.publish(Message(('dupa%s' % (i, )).encode('utf-8')), routing_key='my-stream', confirm=True).result() test = {'a': 0} def handle_msg(msg): test['a'] += 1 cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': '3'}, no_ack=True) fut.result() logger.warning(repr(test)) time.sleep(3) cons.cancel() cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 'first'}, no_ack=True) fut.result() time.sleep(3) cons.cancel() logger.warning(repr(test))